Dagster Integration API¶
metaxy.ext.dagster
¶
Decorators¶
metaxy.ext.dagster.metaxify.metaxify
¶
metaxify(_asset: _T | None = None, *, key: CoercibleToAssetKey | None = None, key_prefix: CoercibleToAssetKeyPrefix | None = None, inject_metaxy_kind: bool = True, inject_code_version: bool = True, set_description: bool = True, inject_column_schema: bool = True, inject_column_lineage: bool = True)
Inject Metaxy metadata into a Dagster AssetsDefinition or AssetSpec.
Affects assets with metaxy/feature metadata set.
Learn more about @metaxify and see example screenshots here.
Parameters:
-
key(CoercibleToAssetKey | None, default:None) –Explicit asset key that overrides all other key resolution logic. Cannot be used with
key_prefixor with multi-asset definitions that produce multiple outputs. -
key_prefix(CoercibleToAssetKeyPrefix | None, default:None) –Prefix to prepend to the resolved asset key. Also applied to upstream dependency keys. Cannot be used with
key. -
inject_metaxy_kind(bool, default:True) –Whether to inject
"metaxy"kind into asset kinds. -
inject_code_version(bool, default:True) –Whether to inject the Metaxy feature code version into the asset's code version. The version is appended in the format
metaxy:<version>. -
set_description(bool, default:True) –Whether to set the asset description from the feature class docstring if the asset doesn't already have a description.
-
inject_column_schema(bool, default:True) –Whether to inject Pydantic field definitions as Dagster column schema. Field types are converted to strings, and field descriptions are used as column descriptions.
-
inject_column_lineage(bool, default:True) –Whether to inject column-level lineage into the asset metadata under
dagster/column_lineage. Uses Pydantic model fields to track column provenance viaFeatureDep.rename,FeatureDep.lineage, and direct pass-through.
Tip
Multiple Dagster assets can contribute to the same Metaxy feature. This is a perfectly valid setup since Metaxy writes are append-only. In order to do this, set the following metadata keys:
- `"metaxy/feature"` pointing to the same Metaxy feature key
- `"metaxy/partition"` should be set to a dictionary mapping column names to values produced by the specific Dagster asset
Example
With @multi_asset
Multiple Metaxy features can be produced by the same @multi_asset. (1)
- Typically, they are produced independently of each other
With dagster.AssetSpec
Multiple Dagster assets contributing to the same Metaxy feature
Source code in src/metaxy/ext/dagster/metaxify.py
def __init__(
self,
_asset: "_T | None" = None,
*,
key: CoercibleToAssetKey | None = None,
key_prefix: CoercibleToAssetKeyPrefix | None = None,
inject_metaxy_kind: bool = True,
inject_code_version: bool = True,
set_description: bool = True,
inject_column_schema: bool = True,
inject_column_lineage: bool = True,
) -> None:
# Actual initialization happens in __new__, but we set defaults here for type checkers
self.key = dg.AssetKey.from_coercible(key) if key is not None else None
self.key_prefix = (
dg.AssetKey.from_coercible(key_prefix) if key_prefix is not None else None
)
self.inject_metaxy_kind = inject_metaxy_kind
self.inject_code_version = inject_code_version
self.set_description = set_description
self.inject_column_schema = inject_column_schema
self.inject_column_lineage = inject_column_lineage
metaxy.ext.dagster.observable.observable_metaxy_asset
¶
observable_metaxy_asset(feature: CoercibleToFeatureKey, *, store_resource_key: str = 'store', inject_metaxy_kind: bool = True, inject_code_version: bool = True, set_description: bool = True, **observable_kwargs: Any)
Decorator to create an observable source asset for a Metaxy feature.
The observation reads the feature's metadata from the store, counts rows,
and uses mean(metaxy_created_at) as the data version to track changes.
Using mean ensures that both additions and deletions are detected.
The decorated function receives (context, store, lazy_df) and can return
a dict of additional metadata to include in the observation.
Parameters:
-
feature(CoercibleToFeatureKey) –The Metaxy feature to observe.
-
store_resource_key(str, default:'store') –Resource key for the MetadataStore (default:
"store"). -
inject_metaxy_kind(bool, default:True) –Whether to inject
"metaxy"kind into asset kinds. -
inject_code_version(bool, default:True) –Whether to inject the Metaxy feature code version.
-
set_description(bool, default:True) –Whether to set description from feature class docstring.
-
**observable_kwargs(Any, default:{}) –Passed to
@observable_source_asset(key, group_name, tags, metadata, description, partitions_def, etc.)
Example
import metaxy.ext.dagster as mxd
from myproject.features import ExternalFeature
@mxd.observable_metaxy_asset(feature=ExternalFeature)
def external_data(context, store, lazy_df):
pass
# With custom metadata - return a dict
@mxd.observable_metaxy_asset(feature=ExternalFeature)
def external_data_with_metrics(context, store, lazy_df):
# Run aggregations in the database
total = lazy_df.select(nw.col("value").sum()).collect().item(0, 0)
return {"custom/total": total}
Note
observable_source_asset does not support deps. Upstream Metaxy feature
dependencies from the feature spec are not propagated to the SourceAsset.
Source code in src/metaxy/ext/dagster/observable.py
def observable_metaxy_asset(
feature: mx.CoercibleToFeatureKey,
*,
store_resource_key: str = "store",
# metaxify kwargs
inject_metaxy_kind: bool = True,
inject_code_version: bool = True,
set_description: bool = True,
# observable_source_asset kwargs
**observable_kwargs: Any,
):
"""Decorator to create an observable source asset for a Metaxy feature.
The observation reads the feature's metadata from the store, counts rows,
and uses `mean(metaxy_created_at)` as the data version to track changes.
Using mean ensures that both additions and deletions are detected.
The decorated function receives `(context, store, lazy_df)` and can return
a dict of additional metadata to include in the observation.
Args:
feature: The Metaxy feature to observe.
store_resource_key: Resource key for the MetadataStore (default: `"store"`).
inject_metaxy_kind: Whether to inject `"metaxy"` kind into asset kinds.
inject_code_version: Whether to inject the Metaxy feature code version.
set_description: Whether to set description from feature class docstring.
**observable_kwargs: Passed to `@observable_source_asset`
(key, group_name, tags, metadata, description, partitions_def, etc.)
Example:
```python
import metaxy.ext.dagster as mxd
from myproject.features import ExternalFeature
@mxd.observable_metaxy_asset(feature=ExternalFeature)
def external_data(context, store, lazy_df):
pass
# With custom metadata - return a dict
@mxd.observable_metaxy_asset(feature=ExternalFeature)
def external_data_with_metrics(context, store, lazy_df):
# Run aggregations in the database
total = lazy_df.select(nw.col("value").sum()).collect().item(0, 0)
return {"custom/total": total}
```
Note:
`observable_source_asset` does not support `deps`. Upstream Metaxy feature
dependencies from the feature spec are not propagated to the SourceAsset.
"""
feature_key = mx.coerce_to_feature_key(feature)
def decorator(fn: Callable[..., Any]) -> dg.SourceAsset:
# Build an AssetSpec from kwargs and enrich with metaxify
# Merge user metadata with metaxy/feature
user_metadata = observable_kwargs.pop("metadata", None) or {}
spec = dg.AssetSpec(
key=observable_kwargs.pop("key", None) or fn.__name__, # ty: ignore[unresolved-attribute]
group_name=observable_kwargs.pop("group_name", None),
tags=observable_kwargs.pop("tags", None),
metadata={
**user_metadata,
DAGSTER_METAXY_FEATURE_METADATA_KEY: feature_key.to_string(),
},
description=observable_kwargs.pop("description", None),
)
enriched = metaxify(
inject_metaxy_kind=inject_metaxy_kind,
inject_code_version=inject_code_version,
set_description=set_description,
)(spec)
def _observe(context: dg.AssetExecutionContext) -> dg.ObserveResult:
store: mx.MetadataStore = getattr(context.resources, store_resource_key)
# Check for metaxy/partition metadata to apply filtering
metaxy_partition = enriched.metadata.get(
DAGSTER_METAXY_PARTITION_METADATA_KEY
)
filters = build_metaxy_partition_filter(metaxy_partition)
with store:
lazy_df = store.read_metadata(feature_key, filters=filters)
stats = compute_stats_from_lazy_frame(lazy_df)
# Call the user's function - it can return additional metadata
extra_metadata = fn(context, store, lazy_df) or {}
metadata: dict[str, Any] = {"dagster/row_count": stats.row_count}
metadata.update(extra_metadata)
return dg.ObserveResult(
data_version=stats.data_version,
metadata=metadata,
tags=build_feature_event_tags(feature_key),
)
# Apply observable_source_asset decorator
return dg.observable_source_asset(
key=enriched.key,
description=enriched.description,
group_name=enriched.group_name,
tags=dict(enriched.tags) if enriched.tags else None,
metadata=dict(enriched.metadata) if enriched.metadata else None,
required_resource_keys={store_resource_key},
**observable_kwargs,
)(_observe)
return decorator
IO Manager¶
metaxy.ext.dagster.MetaxyIOManager
¶
Bases: ConfigurableIOManager
MetaxyIOManager is a Dagster IOManager that reads and writes data to/from Metaxy's MetadataStore.
It automatically attaches Metaxy feature and store metadata to Dagster materialization events and handles partitioned assets.
Always set "metaxy/feature" Dagster metadata
This IOManager is using "metaxy/feature" Dagster metadata key to map Dagster assets into Metaxy features.
It expects it to be set on the assets being loaded or materialized.
Defining Partitioned Assets
To tell Metaxy which column to use when filtering partitioned assets, set "partition_by" Dagster metadata key.
Example
This key is commonly used to configure partitioning behavior by various Dagster IO managers.
Functions¶
metaxy.ext.dagster.MetaxyIOManager.load_input
¶
load_input(context: InputContext) -> LazyFrame[Any]
Load feature metadata from MetadataStore.
Reads metadata for the feature specified in the asset's "metaxy/feature" metadata.
For partitioned assets, filters to the current partition using the column specified
in "partition_by" metadata.
Parameters:
-
context(InputContext) –Dagster input context containing asset metadata.
Returns:
Source code in src/metaxy/ext/dagster/io_manager.py
def load_input(self, context: "dg.InputContext") -> nw.LazyFrame[Any]:
"""Load feature metadata from [`MetadataStore`][metaxy.MetadataStore].
Reads metadata for the feature specified in the asset's `"metaxy/feature"` metadata.
For partitioned assets, filters to the current partition using the column specified
in `"partition_by"` metadata.
Args:
context: Dagster input context containing asset metadata.
Returns:
A narwhals LazyFrame with the feature metadata.
"""
with self.metadata_store:
feature_key = self._feature_key_from_context(context)
store_metadata = self.metadata_store.get_store_metadata(feature_key)
# Build input metadata, transforming special keys to dagster standard format
input_metadata: dict[str, Any] = {}
for key, value in store_metadata.items():
if key == "display":
input_metadata["metaxy/store"] = value
elif key == "table_name":
input_metadata["dagster/table_name"] = value
elif key == "uri":
input_metadata["dagster/uri"] = dg.MetadataValue.path(value)
else:
input_metadata[key] = value
# Only add input metadata if we have exactly one partition key
# (add_input_metadata internally uses asset_partition_key which fails with multiple)
# TODO: raise an issue in Dagter
# or implement our own observation logging for multiple partition keys
has_single_partition = (
context.has_asset_partitions
and len(list(context.asset_partition_keys)) == 1
)
if input_metadata and (
not context.has_asset_partitions or has_single_partition
):
context.add_input_metadata(
input_metadata, description="Metadata Store Info"
)
# Build partition filters from context (handles partition_by and metaxy/partition)
filters = build_partition_filter_from_input_context(context)
return self.metadata_store.read_metadata(
feature=feature_key,
filters=filters,
)
metaxy.ext.dagster.MetaxyIOManager.handle_output
¶
handle_output(context: OutputContext, obj: MetaxyOutput) -> None
Write feature metadata to MetadataStore.
Writes the output dataframe to the metadata store for the feature specified
in the asset's "metaxy/feature" metadata. Also logs metadata about the
feature and store to Dagster's materialization events.
If obj is None, only metadata logging is performed (no data is written).
Parameters:
-
context(OutputContext) –Dagster output context containing asset metadata.
-
obj(MetaxyOutput) –A narwhals-compatible dataframe to write, or None to skip writing.
Source code in src/metaxy/ext/dagster/io_manager.py
def handle_output(self, context: "dg.OutputContext", obj: MetaxyOutput) -> None:
"""Write feature metadata to [`MetadataStore`][metaxy.MetadataStore].
Writes the output dataframe to the metadata store for the feature specified
in the asset's `"metaxy/feature"` metadata. Also logs metadata about the
feature and store to Dagster's materialization events.
If `obj` is `None`, only metadata logging is performed (no data is written).
Args:
context: Dagster output context containing asset metadata.
obj: A narwhals-compatible dataframe to write, or None to skip writing.
"""
assert DAGSTER_METAXY_FEATURE_METADATA_KEY in context.definition_metadata, (
f'Missing `"{DAGSTER_METAXY_FEATURE_METADATA_KEY}"` key in asset metadata'
)
key = self._feature_key_from_context(context)
feature = mx.get_feature_by_key(key)
if obj is not None:
context.log.debug(
f'Writing metadata for Metaxy feature "{key.to_string()}" into {self.metadata_store.display()}'
)
with self.metadata_store.open("write"):
self.metadata_store.write_metadata(feature=feature, df=obj)
context.log.debug(
f'Metadata written for Metaxy feature "{key.to_string()}" into {self.metadata_store.display()}'
)
else:
context.log.debug(
f'The output corresponds to Metaxy feature "{key.to_string()}" stored in {self.metadata_store.display()}'
)
self._log_output_metadata(context)
Dagster Types¶
metaxy.ext.dagster.dagster_type.feature_to_dagster_type
¶
feature_to_dagster_type(feature: CoercibleToFeatureKey, *, name: str | None = None, description: str | None = None, inject_column_schema: bool = True, inject_column_lineage: bool = True, metadata: Mapping[str, Any] | None = None) -> DagsterType
Build a Dagster type from a Metaxy feature.
Creates a dagster.DagsterType that validates outputs are
MetaxyOutput (i.e., narwhals-compatible
dataframes or None) and includes metadata derived from the feature's Pydantic
model fields.
Parameters:
-
feature(CoercibleToFeatureKey) –The Metaxy feature to create a type for. Can be a feature class, feature key, or string that can be coerced to a feature key.
-
name(str | None, default:None) –Optional custom name for the DagsterType. Defaults to the feature's table name (e.g., "project__feature_name").
-
description(str | None, default:None) –Optional custom description. Defaults to the feature class docstring or a generated description.
-
inject_column_schema(bool, default:True) –Whether to inject the column schema as metadata. The schema is derived from Pydantic model fields.
-
inject_column_lineage(bool, default:True) –Whether to inject column lineage as metadata. The lineage is derived from feature dependencies.
-
metadata(Mapping[str, Any] | None, default:None) –Optional custom metadata to inject into the DagsterType.
Returns:
-
DagsterType–A DagsterType configured for the Metaxy feature with appropriate
-
DagsterType–type checking and metadata.
Tip
This is automatically injected by @metaxify
Example
import dagster as dg
import polars as pl
import metaxy.ext.dagster as mxd
from myproject.features import MyFeature # Your Metaxy feature class
@mxd.metaxify(feature=MyFeature)
@dg.asset(dagster_type=mxd.feature_to_dagster_type(MyFeature))
def my_asset():
return pl.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})
See also
metaxify: Decorator for injecting Metaxy metadata into Dagster assets.MetaxyOutput: The type alias for valid Metaxy outputs.
Source code in src/metaxy/ext/dagster/dagster_type.py
def feature_to_dagster_type(
feature: mx.CoercibleToFeatureKey,
*,
name: str | None = None,
description: str | None = None,
inject_column_schema: bool = True,
inject_column_lineage: bool = True,
metadata: Mapping[str, Any] | None = None,
) -> dg.DagsterType:
"""Build a Dagster type from a Metaxy feature.
Creates a `dagster.DagsterType` that validates outputs are
[`MetaxyOutput`][metaxy.ext.dagster.MetaxyOutput] (i.e., narwhals-compatible
dataframes or `None`) and includes metadata derived from the feature's Pydantic
model fields.
Args:
feature: The Metaxy feature to create a type for. Can be a feature class,
feature key, or string that can be coerced to a feature key.
name: Optional custom name for the DagsterType. Defaults to the feature's
table name (e.g., "project__feature_name").
description: Optional custom description. Defaults to the feature class
docstring or a generated description.
inject_column_schema: Whether to inject the column schema as metadata.
The schema is derived from Pydantic model fields.
inject_column_lineage: Whether to inject column lineage as metadata.
The lineage is derived from feature dependencies.
metadata: Optional custom metadata to inject into the DagsterType.
Returns:
A DagsterType configured for the Metaxy feature with appropriate
type checking and metadata.
!!! tip
This is automatically injected by [`@metaxify`][metaxy.ext.dagster.metaxify.metaxify]
Example:
```python
import dagster as dg
import polars as pl
import metaxy.ext.dagster as mxd
from myproject.features import MyFeature # Your Metaxy feature class
@mxd.metaxify(feature=MyFeature)
@dg.asset(dagster_type=mxd.feature_to_dagster_type(MyFeature))
def my_asset():
return pl.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})
```
!!! info "See also"
- [`metaxify`][metaxy.ext.dagster.metaxify.metaxify]: Decorator for injecting
Metaxy metadata into Dagster assets.
- [`MetaxyOutput`][metaxy.ext.dagster.MetaxyOutput]: The type alias for valid
Metaxy outputs.
"""
from metaxy.ext.dagster.io_manager import MetaxyOutput
feature_key = mx.coerce_to_feature_key(feature)
feature_cls = mx.get_feature_by_key(feature_key)
# Determine name
type_name = name or feature_key.table_name
# Determine description
if description is None:
if feature_cls.__doc__:
import inspect
description = inspect.cleandoc(feature_cls.__doc__)
else:
description = f"Metaxy feature '{feature_key.to_string()}'."
# Build metadata - start with custom metadata if provided
final_metadata: dict[str, Any] = dict(metadata) if metadata else {}
final_metadata[DAGSTER_METAXY_INFO_METADATA_KEY] = build_feature_info_metadata(
feature_key
)
if inject_column_schema:
column_schema = build_column_schema(feature_cls)
if column_schema is not None:
final_metadata[DAGSTER_COLUMN_SCHEMA_METADATA_KEY] = column_schema
if inject_column_lineage:
column_lineage = build_column_lineage(feature_cls)
if column_lineage is not None:
final_metadata[DAGSTER_COLUMN_LINEAGE_METADATA_KEY] = column_lineage
dagster_type = dg.DagsterType(
type_check_fn=_create_type_check_fn(feature_key),
name=type_name,
description=description,
typing_type=MetaxyOutput,
metadata=final_metadata,
)
return dagster_type
Dagster Event Generators¶
metaxy.ext.dagster.utils.generate_materialize_results
¶
generate_materialize_results(context: AssetExecutionContext, store: MetadataStore | MetaxyStoreFromConfigResource, specs: Iterable[AssetSpec] | None = None) -> Iterator[MaterializeResult[None]]
Generate dagster.MaterializeResult events for assets in topological order.
Yields a MaterializeResult for each asset spec, sorted by their associated
Metaxy features in topological order (dependencies before dependents).
Each result includes the row count as "dagster/row_count" metadata.
Parameters:
-
context(AssetExecutionContext) –The Dagster asset execution context.
-
store(MetadataStore | MetaxyStoreFromConfigResource) –The Metaxy metadata store to read from.
-
specs(Iterable[AssetSpec] | None, default:None) –Optional, concrete Dagster asset specs. If missing, specs will be taken from the context.
Yields:
-
MaterializeResult[None]–Materialization result for each asset in topological order.
Example
specs = [
dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]
@metaxify
@dg.multi_asset(specs=specs)
def my_multi_asset(context: dg.AssetExecutionContext, store: mx.MetadataStore):
# ... compute and write data ...
yield from generate_materialize_results(context, store)
Source code in src/metaxy/ext/dagster/utils.py
def generate_materialize_results(
context: dg.AssetExecutionContext,
store: mx.MetadataStore | MetaxyStoreFromConfigResource,
specs: Iterable[dg.AssetSpec] | None = None,
) -> Iterator[dg.MaterializeResult[None]]:
"""Generate `dagster.MaterializeResult` events for assets in topological order.
Yields a `MaterializeResult` for each asset spec, sorted by their associated
Metaxy features in topological order (dependencies before dependents).
Each result includes the row count as `"dagster/row_count"` metadata.
Args:
context: The Dagster asset execution context.
store: The Metaxy metadata store to read from.
specs: Optional, concrete Dagster asset specs.
If missing, specs will be taken from the context.
Yields:
Materialization result for each asset in topological order.
Example:
```python
specs = [
dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]
@metaxify
@dg.multi_asset(specs=specs)
def my_multi_asset(context: dg.AssetExecutionContext, store: mx.MetadataStore):
# ... compute and write data ...
yield from generate_materialize_results(context, store)
```
"""
# Build mapping from feature key to asset spec
spec_by_feature_key: dict[mx.FeatureKey, dg.AssetSpec] = {}
specs = specs or context.assets_def.specs
for spec in specs:
if feature_key_raw := spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY):
feature_key = mx.coerce_to_feature_key(feature_key_raw)
spec_by_feature_key[feature_key] = spec
# Sort by topological order of feature keys
graph = mx.FeatureGraph.get_active()
sorted_keys = graph.topological_sort_features(list(spec_by_feature_key.keys()))
for key in sorted_keys:
asset_spec = spec_by_feature_key[key]
partition_col = asset_spec.metadata.get(DAGSTER_METAXY_PARTITION_KEY)
metaxy_partition = asset_spec.metadata.get(
DAGSTER_METAXY_PARTITION_METADATA_KEY
)
with store: # ty: ignore[invalid-context-manager]
try:
# Build runtime metadata (handles reading, filtering, and stats internally)
metadata, stats = build_runtime_feature_metadata(
key,
store,
context,
partition_col=partition_col,
metaxy_partition=metaxy_partition,
)
except FeatureNotFoundError:
context.log.exception(
f"Feature {key.to_string()} not found in store, skipping materialization result"
)
continue
# Get materialized-in-run count if materialization_id is set
if store.materialization_id is not None: # ty: ignore[possibly-missing-attribute]
mat_df = store.read_metadata( # ty: ignore[possibly-missing-attribute]
key,
filters=[
nw.col(METAXY_MATERIALIZATION_ID) == store.materialization_id # ty: ignore[possibly-missing-attribute]
],
)
metadata["metaxy/materialized_in_run"] = (
mat_df.select(nw.len()).collect().item(0, 0)
)
yield dg.MaterializeResult(
value=None,
asset_key=asset_spec.key,
metadata=metadata,
data_version=stats.data_version,
tags=build_feature_event_tags(key),
)
metaxy.ext.dagster.utils.generate_observe_results
¶
generate_observe_results(context: AssetExecutionContext, store: MetadataStore | MetaxyStoreFromConfigResource, specs: Iterable[AssetSpec] | None = None) -> Iterator[ObserveResult]
Generate dagster.ObserveResult events for assets in topological order.
Yields an ObserveResult for each asset spec that has "metaxy/feature" metadata key set, sorted by their associated
Metaxy features in topological order.
Each result includes the row count as "dagster/row_count" metadata.
Parameters:
-
context(AssetExecutionContext) –The Dagster asset execution context.
-
store(MetadataStore | MetaxyStoreFromConfigResource) –The Metaxy metadata store to read from.
-
specs(Iterable[AssetSpec] | None, default:None) –Optional, concrete Dagster asset specs. If missing, this function will take the current specs from the context.
Yields:
-
ObserveResult–Observation result for each asset in topological order.
Example
specs = [
dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]
@metaxify
@dg.multi_observable_source_asset(specs=specs)
def my_observable_assets(context: dg.AssetExecutionContext, store: mx.MetadataStore):
yield from generate_observe_results(context, store)
Source code in src/metaxy/ext/dagster/utils.py
def generate_observe_results(
context: dg.AssetExecutionContext,
store: mx.MetadataStore | MetaxyStoreFromConfigResource,
specs: Iterable[dg.AssetSpec] | None = None,
) -> Iterator[dg.ObserveResult]:
"""Generate `dagster.ObserveResult` events for assets in topological order.
Yields an `ObserveResult` for each asset spec that has `"metaxy/feature"` metadata key set, sorted by their associated
Metaxy features in topological order.
Each result includes the row count as `"dagster/row_count"` metadata.
Args:
context: The Dagster asset execution context.
store: The Metaxy metadata store to read from.
specs: Optional, concrete Dagster asset specs.
If missing, this function will take the current specs from the context.
Yields:
Observation result for each asset in topological order.
Example:
```python
specs = [
dg.AssetSpec("output_a", metadata={"metaxy/feature": "my/feature/a"}),
dg.AssetSpec("output_b", metadata={"metaxy/feature": "my/feature/b"}),
]
@metaxify
@dg.multi_observable_source_asset(specs=specs)
def my_observable_assets(context: dg.AssetExecutionContext, store: mx.MetadataStore):
yield from generate_observe_results(context, store)
```
"""
# Build mapping from feature key to asset spec
spec_by_feature_key: dict[mx.FeatureKey, dg.AssetSpec] = {}
specs = specs or context.assets_def.specs
for spec in specs:
if feature_key_raw := spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY):
feature_key = mx.coerce_to_feature_key(feature_key_raw)
spec_by_feature_key[feature_key] = spec
# Sort by topological order of feature keys
graph = mx.FeatureGraph.get_active()
sorted_keys = graph.topological_sort_features(list(spec_by_feature_key.keys()))
for key in sorted_keys:
asset_spec = spec_by_feature_key[key]
partition_col = asset_spec.metadata.get(DAGSTER_METAXY_PARTITION_KEY)
metaxy_partition = asset_spec.metadata.get(
DAGSTER_METAXY_PARTITION_METADATA_KEY
)
with store: # ty: ignore[invalid-context-manager]
try:
# Build runtime metadata (handles reading, filtering, and stats internally)
# For observers with no metaxy_partition, this reads all data
metadata, stats = build_runtime_feature_metadata(
key,
store,
context,
partition_col=partition_col,
metaxy_partition=metaxy_partition,
)
except FeatureNotFoundError:
context.log.exception(
f"Feature {key.to_string()} not found in store, skipping observation result"
)
continue
yield dg.ObserveResult(
asset_key=asset_spec.key,
metadata=metadata,
data_version=stats.data_version,
tags=build_feature_event_tags(key),
)
metaxy.ext.dagster.utils.build_feature_info_metadata
¶
build_feature_info_metadata(feature: CoercibleToFeatureKey) -> dict[str, Any]
Build feature info metadata dict for Dagster assets.
Creates a dictionary with information about the Metaxy feature that can be
used as Dagster asset metadata under the "metaxy/feature_info" key.
Parameters:
-
feature(CoercibleToFeatureKey) –The Metaxy feature (class, key, or string).
Returns:
-
dict[str, Any]–A nested dictionary containing:
-
dict[str, Any]–feature: Feature informationproject: The project namespec: The full feature spec as a dict (viamodel_dump())version: The feature version stringtype: The feature class module path
-
dict[str, Any]–metaxy: Metaxy library informationversion: The metaxy library version
Tip
This is automatically injected by @metaxify
Example
from metaxy.ext.dagster.utils import build_feature_info_metadata
info = build_feature_info_metadata(MyFeature)
# {
# "feature": {
# "project": "my_project",
# "spec": {...}, # Full FeatureSpec model_dump()
# "version": "my__feature@abc123",
# "type": "myproject.features",
# },
# "metaxy": {
# "version": "0.1.0",
# },
# }
Source code in src/metaxy/ext/dagster/utils.py
def build_feature_info_metadata(
feature: mx.CoercibleToFeatureKey,
) -> dict[str, Any]:
"""Build feature info metadata dict for Dagster assets.
Creates a dictionary with information about the Metaxy feature that can be
used as Dagster asset metadata under the `"metaxy/feature_info"` key.
Args:
feature: The Metaxy feature (class, key, or string).
Returns:
A nested dictionary containing:
- `feature`: Feature information
- `project`: The project name
- `spec`: The full feature spec as a dict (via `model_dump()`)
- `version`: The feature version string
- `type`: The feature class module path
- `metaxy`: Metaxy library information
- `version`: The metaxy library version
!!! tip
This is automatically injected by [`@metaxify`][metaxy.ext.dagster.metaxify.metaxify]
Example:
```python
from metaxy.ext.dagster.utils import build_feature_info_metadata
info = build_feature_info_metadata(MyFeature)
# {
# "feature": {
# "project": "my_project",
# "spec": {...}, # Full FeatureSpec model_dump()
# "version": "my__feature@abc123",
# "type": "myproject.features",
# },
# "metaxy": {
# "version": "0.1.0",
# },
# }
```
"""
feature_key = mx.coerce_to_feature_key(feature)
feature_cls = mx.get_feature_by_key(feature_key)
return {
"feature": {
"project": feature_cls.project,
"spec": feature_cls.spec().model_dump(mode="json"),
"version": feature_cls.feature_version(),
"type": feature_cls.__module__,
},
"metaxy": {
"version": mx.__version__,
"plugins": mx.MetaxyConfig.get().plugins,
},
}
Observation Jobs¶
metaxy.ext.dagster.observation_job.build_metaxy_multi_observation_job
¶
build_metaxy_multi_observation_job(name: str, *, asset_selection: AssetSelection | None = None, defs: Definitions | None = None, assets: Sequence[AssetSpec | AssetsDefinition | SourceAsset] | None = None, store_resource_key: str = 'store', tags: Mapping[str, str] | None = None, **kwargs: Any) -> JobDefinition
Build a dynamic Dagster job that observes multiple Metaxy feature assets.
Creates a job that dynamically spawns one op per asset, yielding
AssetObservation events.
Uses Dagster's dynamic orchestration to process multiple assets in parallel.
Tip
This is a very powerful way to observe all your Metaxy features at once. Use it in combination with a Dagster schedule to run it periodically.
Provide either:
- asset_selection and defs: Select assets from a
Definitions object
assets: Direct list of assets to observe
Note
All selected assets must share the same partitioning (if any).
Parameters:
-
name(str) –Name for the job.
-
asset_selection(AssetSelection | None, default:None) –An
AssetSelectionspecifying which assets to observe. Must be used together withdefs. -
defs(Definitions | None, default:None) –The
Definitionsobject to resolve the selection against. Must be used together withasset_selection. -
assets(Sequence[AssetSpec | AssetsDefinition | SourceAsset] | None, default:None) –Direct sequence of assets to observe. Each item can be an
AssetSpec,AssetsDefinition, orSourceAsset. Cannot be used together withasset_selection/defs. -
store_resource_key(str, default:'store') –Resource key for the MetadataStore (default:
"store"). -
tags(Mapping[str, str] | None, default:None) –Optional tags to apply to the job.
-
**kwargs(Any, default:{}) –Additional keyword arguments passed to the
@jobdecorator.
Returns:
-
JobDefinition–A Dagster job definition that observes all matching Metaxy assets.
Raises:
-
ValueError–If no specs have
metaxy/featuremetadata, if assets have inconsistentpartitions_def, or if invalid argument combinations are provided.
Example
import dagster as dg
import metaxy.ext.dagster as mxd
@mxd.metaxify()
@dg.asset(metadata={"metaxy/feature": "my/feature_a"})
def feature_a():
...
@mxd.metaxify()
@dg.asset(metadata={"metaxy/feature": "my/feature_b"})
def feature_b():
...
# Option 1: Using asset_selection + defs
my_defs = dg.Definitions(assets=[feature_a, feature_b])
observation_job = mxd.build_metaxy_multi_observation_job(
name="observe_my_features",
asset_selection=dg.AssetSelection.kind("metaxy"),
defs=my_defs,
)
# Option 2: Using direct assets list
observation_job = mxd.build_metaxy_multi_observation_job(
name="observe_my_features",
assets=[feature_a, feature_b],
)
Source code in src/metaxy/ext/dagster/observation_job.py
def build_metaxy_multi_observation_job(
name: str,
*,
asset_selection: dg.AssetSelection | None = None,
defs: dg.Definitions | None = None,
assets: Sequence[dg.AssetSpec | dg.AssetsDefinition | dg.SourceAsset] | None = None,
store_resource_key: str = "store",
tags: Mapping[str, str] | None = None,
**kwargs: Any,
) -> dg.JobDefinition:
"""Build a dynamic Dagster job that observes multiple Metaxy feature assets.
Creates a job that dynamically spawns one op per asset, yielding
[`AssetObservation`](https://docs.dagster.io/api/python-api/ops#dagster.AssetObservation) events.
Uses Dagster's dynamic orchestration to process multiple assets in parallel.
!!! tip
This is a very powerful way to observe all your Metaxy features at once.
Use it in combination with a [Dagster schedule](https://docs.dagster.io/concepts/schedules)
to run it periodically.
Provide either:
- `asset_selection` and `defs`: Select assets from a
[`Definitions`](https://docs.dagster.io/api/python-api/definitions#dagster.Definitions) object
- `assets`: Direct list of assets to observe
!!! note
All selected assets must share the same partitioning (if any).
Args:
name: Name for the job.
asset_selection: An `AssetSelection` specifying which assets to observe.
Must be used together with `defs`.
defs: The `Definitions` object to resolve the selection against.
Must be used together with `asset_selection`.
assets: Direct sequence of assets to observe. Each item can be an
`AssetSpec`, `AssetsDefinition`, or `SourceAsset`.
Cannot be used together with `asset_selection`/`defs`.
store_resource_key: Resource key for the MetadataStore (default: `"store"`).
tags: Optional tags to apply to the job.
**kwargs: Additional keyword arguments passed to the
[`@job`](https://docs.dagster.io/api/python-api/jobs#dagster.job) decorator.
Returns:
A Dagster job definition that observes all matching Metaxy assets.
Raises:
ValueError: If no specs have `metaxy/feature` metadata, if assets have
inconsistent `partitions_def`, or if invalid argument combinations
are provided.
Example:
```python
import dagster as dg
import metaxy.ext.dagster as mxd
@mxd.metaxify()
@dg.asset(metadata={"metaxy/feature": "my/feature_a"})
def feature_a():
...
@mxd.metaxify()
@dg.asset(metadata={"metaxy/feature": "my/feature_b"})
def feature_b():
...
# Option 1: Using asset_selection + defs
my_defs = dg.Definitions(assets=[feature_a, feature_b])
observation_job = mxd.build_metaxy_multi_observation_job(
name="observe_my_features",
asset_selection=dg.AssetSelection.kind("metaxy"),
defs=my_defs,
)
# Option 2: Using direct assets list
observation_job = mxd.build_metaxy_multi_observation_job(
name="observe_my_features",
assets=[feature_a, feature_b],
)
```
"""
tags = tags or {}
# Validate argument combinations
has_selection = asset_selection is not None or defs is not None
has_assets = assets is not None
if has_selection and has_assets:
raise ValueError(
"Cannot provide both 'assets' and 'asset_selection'/'defs'. "
"Use either asset_selection + defs, or assets alone."
)
if not has_selection and not has_assets:
raise ValueError("Must provide either 'asset_selection' + 'defs', or 'assets'.")
if has_selection:
if asset_selection is None:
raise ValueError("'defs' requires 'asset_selection' to be provided.")
if defs is None:
raise ValueError("'asset_selection' requires 'defs' to be provided.")
# Resolve selection using defs
all_assets_defs = list(defs.resolve_asset_graph().assets_defs)
selected_keys = asset_selection.resolve(all_assets_defs)
# Get specs for selected keys, with partitions_def
metaxy_specs: list[dg.AssetSpec] = []
partitions_defs: list[dg.PartitionsDefinition | None] = []
for asset_def in all_assets_defs:
for spec in asset_def.specs:
if spec.key in selected_keys:
if (
spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY)
is not None
):
metaxy_specs.append(spec)
partitions_defs.append(asset_def.partitions_def)
else:
# Direct assets list
assert assets is not None
metaxy_specs = []
partitions_defs = []
for asset in assets:
if isinstance(asset, dg.AssetSpec):
if asset.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY) is not None:
metaxy_specs.append(asset)
partitions_defs.append(asset.partitions_def)
elif isinstance(asset, dg.AssetsDefinition):
for spec in asset.specs:
if (
spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY)
is not None
):
metaxy_specs.append(spec)
partitions_defs.append(asset.partitions_def)
elif isinstance(asset, dg.SourceAsset):
# SourceAsset doesn't have metaxy/feature metadata typically
pass
else:
raise TypeError(
f"Expected AssetSpec, AssetsDefinition, or SourceAsset, "
f"got {type(asset).__name__}"
)
if not metaxy_specs:
raise ValueError(
"No assets have specs with 'metaxy/feature' metadata. "
"Ensure your assets have metadata={'metaxy/feature': 'feature/key'}."
)
# Validate all specs have the same partitions_def
first_partitions_def = partitions_defs[0]
for i, pdef in enumerate(partitions_defs[1:], start=1):
if pdef != first_partitions_def:
raise ValueError(
f"All assets must have the same partitions_def. "
f"Asset 0 has {first_partitions_def}, but asset {i} has {pdef}."
)
partitions_def = first_partitions_def
# Build feature keys for description (may have duplicates when multiple assets share a feature)
feature_keys = [
mx.coerce_to_feature_key(spec.metadata[DAGSTER_METAXY_FEATURE_METADATA_KEY])
for spec in metaxy_specs
]
# Build a mapping of asset key -> spec for the dynamic op
# This ensures each asset gets its own op, even if multiple assets share the same feature
spec_by_asset_key = {spec.key.to_user_string(): spec for spec in metaxy_specs}
# Op that emits dynamic outputs for each asset
@dg.op(name=f"{name}_fanout", out=dg.DynamicOut(str))
def fanout_assets() -> Any:
for asset_key_str in spec_by_asset_key:
# Use asset key (with / replaced by __) as mapping key for Dagster identifiers
safe_mapping_key = asset_key_str.replace("/", "__")
yield dg.DynamicOutput(asset_key_str, mapping_key=safe_mapping_key)
# Build the shared observation op
observe_op = _build_observation_op_for_specs(
name=f"{name}_observe",
spec_by_asset_key=spec_by_asset_key,
store_resource_key=store_resource_key,
)
# Build job metadata with asset references
job_metadata: dict[str, Any] = {
"metaxy/features": [fk.to_string() for fk in feature_keys],
}
for spec in metaxy_specs:
job_metadata[f"metaxy/asset/{spec.key.to_user_string()}"] = (
dg.MetadataValue.asset(spec.key)
)
# Build description as markdown list showing both assets and features
asset_list = "\n".join(
f"- `{spec.key.to_user_string()}` → `{spec.metadata[DAGSTER_METAXY_FEATURE_METADATA_KEY]}`"
for spec in metaxy_specs
)
description = f"Observe {len(metaxy_specs)} Metaxy assets:\n\n{asset_list}"
@dg.job(
name=name,
partitions_def=partitions_def,
tags=tags,
description=description,
metadata=job_metadata,
**kwargs,
)
def observation_job() -> None:
asset_keys_dynamic = fanout_assets()
asset_keys_dynamic.map(observe_op)
return observation_job
metaxy.ext.dagster.observation_job.build_metaxy_observation_job
¶
build_metaxy_observation_job(asset: AssetSpec | AssetsDefinition, *, store_resource_key: str = 'store', tags: dict[str, str] | None = None) -> list[JobDefinition]
Build Dagster job(s) that observe Metaxy feature asset(s).
Creates job(s) that yield AssetObservation events for the given asset.
The job can be run independently from asset materialization, e.g., on a schedule.
Returns one job per metaxy/feature spec found in the asset.
Jobs are constructed with matching partitions definitions.
Job names are always derived as observe_<FeatureKey.table_name()>.
Parameters:
-
asset(AssetSpec | AssetsDefinition) –Asset spec or asset definition to observe. Must have
metaxy/featuremetadata on at least one spec. -
store_resource_key(str, default:'store') –Resource key for the MetadataStore (default:
"store"). -
tags(dict[str, str] | None, default:None) –Optional tags to apply to the job(s).
Returns:
-
list[JobDefinition]–List of Dagster job definitions, one per
metaxy/featurespec.
Raises:
-
ValueError–If no specs have
metaxy/featuremetadata.
Example
import dagster as dg
import metaxy.ext.dagster as mxd
@mxd.metaxify()
@dg.asset(metadata={"metaxy/feature": "my/feature"})
def my_asset():
...
# Build the observation job - partitions_def is extracted automatically
observation_job = mxd.build_metaxy_observation_job(my_asset)
# Include in your Definitions
defs = dg.Definitions(
jobs=[observation_job],
resources={"store": my_store_resource},
)
Source code in src/metaxy/ext/dagster/observation_job.py
def build_metaxy_observation_job(
asset: dg.AssetSpec | dg.AssetsDefinition,
*,
store_resource_key: str = "store",
tags: dict[str, str] | None = None,
) -> list[dg.JobDefinition]:
"""Build Dagster job(s) that observe Metaxy feature asset(s).
Creates job(s) that yield `AssetObservation` events for the given asset.
The job can be run independently from asset materialization, e.g., on a schedule.
Returns one job per `metaxy/feature` spec found in the asset.
Jobs are constructed with matching partitions definitions.
Job names are always derived as `observe_<FeatureKey.table_name()>`.
Args:
asset: Asset spec or asset definition to observe. Must have `metaxy/feature`
metadata on at least one spec.
store_resource_key: Resource key for the MetadataStore (default: `"store"`).
tags: Optional tags to apply to the job(s).
Returns:
List of Dagster job definitions, one per `metaxy/feature` spec.
Raises:
ValueError: If no specs have `metaxy/feature` metadata.
Example:
```python
import dagster as dg
import metaxy.ext.dagster as mxd
@mxd.metaxify()
@dg.asset(metadata={"metaxy/feature": "my/feature"})
def my_asset():
...
# Build the observation job - partitions_def is extracted automatically
observation_job = mxd.build_metaxy_observation_job(my_asset)
# Include in your Definitions
defs = dg.Definitions(
jobs=[observation_job],
resources={"store": my_store_resource},
)
```
"""
# Extract specs and partitions_def from asset
if isinstance(asset, dg.AssetSpec):
specs = [asset]
partitions_def = None
elif isinstance(asset, dg.AssetsDefinition):
specs = list(asset.specs)
partitions_def = asset.partitions_def
else:
raise TypeError(
f"Expected AssetSpec or AssetsDefinition, got {type(asset).__name__}"
)
# Filter to specs with metaxy/feature metadata
metaxy_specs = [
spec
for spec in specs
if spec.metadata.get(DAGSTER_METAXY_FEATURE_METADATA_KEY) is not None
]
if not metaxy_specs:
raise ValueError(
"Asset has no specs with 'metaxy/feature' metadata. "
"Ensure your asset has metadata={'metaxy/feature': 'feature/key'}."
)
# Build jobs for each metaxy spec
jobs = [
_build_observation_job_for_spec(
spec,
partitions_def=partitions_def,
store_resource_key=store_resource_key,
tags=tags,
)
for spec in metaxy_specs
]
return jobs
Resources¶
metaxy.ext.dagster.MetaxyStoreFromConfigResource
¶
Bases: ConfigurableResource[MetadataStore]
This resource creates a metaxy.MetadataStore based on the current Metaxy configuration (metaxy.toml).
If name is not provided, the default store will be used.
It can be set with store = "my_name" in metaxy.toml or with$METAXY_STORE environment variable.
Functions¶
metaxy.ext.dagster.MetaxyStoreFromConfigResource.create_resource
¶
create_resource(context: InitResourceContext) -> MetadataStore
Create a MetadataStore from the Metaxy configuration.
Parameters:
-
context(InitResourceContext) –Dagster resource initialization context.
Returns:
-
MetadataStore–A MetadataStore configured with the Dagster run ID as the materialization ID.
Source code in src/metaxy/ext/dagster/resources.py
def create_resource(self, context: dg.InitResourceContext) -> mx.MetadataStore:
"""Create a MetadataStore from the Metaxy configuration.
Args:
context: Dagster resource initialization context.
Returns:
A MetadataStore configured with the Dagster run ID as the materialization ID.
"""
assert context.run is not None
return mx.MetaxyConfig.get().get_store(
self.name, materialization_id=context.run.run_id
)
Helpers¶
metaxy.ext.dagster.utils.FeatureStats
¶
metaxy.ext.dagster.selection.select_metaxy_assets
¶
select_metaxy_assets(*, project: str | None = None, feature: CoercibleToFeatureKey | None = None) -> AssetSelection
Select Metaxy assets by project and/or feature.
This helper creates an AssetSelection that filters assets tagged by @metaxify.
Parameters:
-
project(str | None, default:None) –Filter by project name. If None, uses
MetaxyConfig.get().project. -
feature(CoercibleToFeatureKey | None, default:None) –Filter by specific feature key. If provided, further narrows the selection.
Returns:
-
AssetSelection–An
AssetSelectionthat can be used withdg.define_asset_job, -
AssetSelection–dg.materialize, orAssetSelectionoperations like|and&.
Select all Metaxy assets in current project
Select a specific feature's assets
Use with asset jobs
Combine with other selections
Source code in src/metaxy/ext/dagster/selection.py
def select_metaxy_assets(
*,
project: str | None = None,
feature: mx.CoercibleToFeatureKey | None = None,
) -> dg.AssetSelection:
"""Select Metaxy assets by project and/or feature.
This helper creates an `AssetSelection` that filters assets tagged by `@metaxify`.
Args:
project: Filter by project name. If None, uses `MetaxyConfig.get().project`.
feature: Filter by specific feature key. If provided, further narrows the selection.
Returns:
An `AssetSelection` that can be used with `dg.define_asset_job`,
`dg.materialize`, or `AssetSelection` operations like `|` and `&`.
Example: Select all Metaxy assets in current project
```python
import metaxy.ext.dagster as mxd
all_metaxy = mxd.select_metaxy_assets()
```
Example: Select assets for a specific project
```python
prod_assets = mxd.select_metaxy_assets(project="production")
```
Example: Select a specific feature's assets
```python
feature_assets = mxd.select_metaxy_assets(feature="my/feature/key")
```
Example: Use with asset jobs
```python
metaxy_job = dg.define_asset_job(
name="materialize_metaxy",
selection=mxd.select_metaxy_assets(),
)
```
Example: Combine with other selections
```python
# All metaxy assets plus some other assets
combined = mxd.select_metaxy_assets() | dg.AssetSelection.keys("other_asset")
# Metaxy assets that are also in a specific group
filtered = mxd.select_metaxy_assets() & dg.AssetSelection.groups("my_group")
```
"""
resolved_project = project if project is not None else mx.MetaxyConfig.get().project
selection = dg.AssetSelection.tag(DAGSTER_METAXY_PROJECT_TAG_KEY, resolved_project)
if feature is not None:
feature_key = mx.coerce_to_feature_key(feature)
selection = selection & dg.AssetSelection.tag(
DAGSTER_METAXY_FEATURE_METADATA_KEY, str(feature_key)
)
return selection