Metaxy + Dagster¶
Metaxy's dependency system has been originally inspired by Dagster.
Because of this, Metaxy code can be naturally composed with Dagster code, Metaxy concepts map directly into Dagster concepts, and the provided @metaxify decorator makes this process effortless.
The only step that has to be taken in order to inject Metaxy into Dagster assets is to associate the Dagster asset with the Metaxy feature.
Unleash the full power of @metaxify on Dagster!
Example
Using "metaxy/feature" Dagster metadata key
Alternatively, set the well-known "metaxy/feature" key (1):
point it to... the Metaxy feature key!
It will take care of bringing the right lineage, description, metadata, and other transferable properties from the Metaxy feature to the Dagster asset.
What's in the box¶
This integration provides:
-
metaxify- a decorator that enriches Dagster asset definitions with Metaxy information such as upstream dependencies, description, metadata, code version, table schema, column lineage, and so on. -
MetaxyStoreFromConfigResource- a resource that provides access toMetadataStore -
MetaxyIOManager- an IO manager that reads and writes Dagster assets that are Metaxy features -
generate_materialize_results/generate_observe_results- generators for yieldingdagster.MaterializeResultordagster.ObserveResultevents from Dagster assets (and multi-assets), with automatic topological ordering, partition filtering, logging row counts, and setting Dagster data versions. -
observable_metaxy_asset- a decorator that creates observable source assets for monitoring external Metaxy features.
Quick Start¶
1. Define Metaxy Features¶
# Upstream feature
upstream_spec = mx.FeatureSpec(
key="audio/embeddings",
id_columns=["audio_id"],
fields=["embedding"],
)
class AudioEmbeddings(mx.BaseFeature, spec=upstream_spec):
audio_id: str
# Downstream feature that depends on upstream
downstream_spec = mx.FeatureSpec(
key="audio/clusters",
id_columns=["audio_id"],
fields=["cluster_id"],
deps=[AudioEmbeddings],
)
class AudioClusters(mx.BaseFeature, spec=downstream_spec):
audio_id: str
mean: float
std: float
2. Define Dagster Assets¶
Root Asset
Let's define an asset that doesn't have any upstream Metaxy features.
@mxd.metaxify
@dg.asset(
metadata={"metaxy/feature": "audio/embeddings"},
io_manager_key="metaxy_io_manager",
)
def audio_embeddings(
store: dg.ResourceParam[mx.MetadataStore],
):
# somehow, acquire root source data
samples = pl.DataFrame(
{
"audio_id": ["a1", "a2", "a3"],
"metaxy_provenance_by_field": [
{"embedding": "hash1"},
{"embedding": "hash2"},
{"embedding": "hash3"},
],
}
)
# resolve the increment with Metaxy
with store:
increment = store.resolve_update("audio/embeddings", samples=samples)
# Compute embeddings...
df = ... # at this point this dataframe should have `mean` and `std` columns set
# either write embeddings metadata via Metaxy
# or return a dataframe to write it via MetaxyIOManager
return df
Downstream Asset
Non-Metaxy Downstream Asset
@dg.asset(
ins={
"clusters": dg.AssetIn(
key=["audio", "clusters"],
)
},
)
def cluster_report(clusters: nw.LazyFrame):
# clusters is a narwhals LazyFrame loaded via MetaxyIOManager
df = clusters.collect().to_polars()
# Generate a report...
return {"total_clusters": df.select("cluster_id").n_unique()}
3. Create Dagster Definitions¶
store = mxd.MetaxyStoreFromConfigResource(name="dev")
metaxy_io_manager = mxd.MetaxyIOManager(store=store)
@dg.definitions
def definitions():
mx.init_metaxy() # (1)!
return dg.Definitions(
assets=[
audio_embeddings,
audio_clusters,
cluster_report,
],
resources={
"store": store,
"metaxy_io_manager": metaxy_io_manager,
},
)
- This loads Metaxy configuration and feature definitions
4. Start Dagster¶
Materialize your assets and let Metaxy take care of state and versioning!
Observable Source Assets¶
Use observable_metaxy_asset to create observable source assets that monitor external Metaxy features.
This is useful when Metaxy features are populated outside of Dagster (e.g., by external pipelines) and you want Dagster to track their data versions.
Basic Observable Asset
import dagster as dg
import metaxy as mx
import metaxy.ext.dagster as mxd
@mxd.observable_metaxy_asset(key="dagster/asset/key", feature="external/feature")
def external_data(context, store: dg.ResourceParam[mx.MetadataStore], lazy_df: nw.LazyFrame):
# build a custom metadata dict
metadata = ...
return metadata
The observation automatically tracks:
- Data version: Uses
mean(metaxy_created_at)to detect both additions and deletions - Row count: Logged as
dagster/row_countmetadata