Aggregation¶
Overview¶
This example demonstrates how to implement aggregation (N:1) relationships with Metaxy.
In such relationships multiple parent samples produce a single child sample.
These relationships can be modeled with LineageRelationship.aggregation lineage type.
We will use a speaker embedding pipeline as an example, where multiple audio recordings from the same speaker are aggregated to compute a single speaker embedding.
The Pipeline¶
Let's define a pipeline with two features:
---
title: Feature Graph
---
flowchart TB
%% Snapshot version: none
%%{init: {'flowchart': {'htmlLabels': true, 'curve': 'basis'}, 'themeVariables': {'fontSize': '14px'}}}%%
audio["<div style="text-align:left"><b>audio</b><br/>c9762705<br/><font color="#999">---</font><br/>- default (80200592)</div>"]
speaker_embedding["<div style="text-align:left"><b>speaker/embedding</b><br/>83296faf<br/><font color="#999">---</font><br/>- embedding (8b5b20fb)</div>"]
audio --> speaker_embedding
Defining features: Audio¶
Each audio recording has an audio_id (unique identifier) and a speaker_id (which speaker it belongs to). Multiple audio recordings can belong to the same speaker.
import metaxy as mx
class Audio(
mx.BaseFeature,
spec=mx.FeatureSpec(
key="audio",
id_columns=["audio_id"],
fields=["default"],
),
):
"""Audio recordings of different speakers."""
audio_id: str
speaker_id: str
duration_seconds: float
path: str
Defining features: SpeakerEmbedding¶
SpeakerEmbedding aggregates all audio recordings from a speaker into a single embedding. The key configuration is the lineage parameter which tells Metaxy that multiple Audio records with the same speaker_id are aggregated into one SpeakerEmbedding.
class SpeakerEmbedding(
mx.BaseFeature,
spec=mx.FeatureSpec(
key="speaker/embedding",
id_columns=["speaker_id"],
deps=[
mx.FeatureDep(
feature=Audio,
lineage=mx.LineageRelationship.aggregation(on=["speaker_id"]),
)
],
fields=[
mx.FieldSpec(key="embedding", code_version="1"),
],
),
):
"""Speaker embedding aggregated from all their audio recordings.
This demonstrates N:1 aggregation lineage where multiple audio recordings
from the same speaker are aggregated into a single speaker embedding.
"""
speaker_id: str
n_dim: int
path: str
The LineageRelationship.aggregation(on=["speaker_id"]) declaration is the key part. It tells Metaxy:
- Multiple
Audiorows are aggregated into oneSpeakerEmbeddingrow - The aggregation is keyed on
speaker_id- all audio with the same speaker_id contributes to one embedding - When any audio for a speaker changes, the aggregated provenance changes, triggering recomputation of that speaker's embedding
Walkthrough¶
Here is the pipeline code that processes audio and computes speaker embeddings:
pipeline.py
"""Pipeline demonstrating N:1 aggregation lineage.
This example shows how Metaxy handles aggregation lineage where multiple
upstream records (audio recordings) are aggregated into a single downstream
record (speaker embedding).
Key concepts:
- Multiple audio recordings per speaker are aggregated into one embedding
- When any audio for a speaker changes, the speaker embedding needs recomputation
"""
import polars as pl
from example_aggregation.features import Audio, SpeakerEmbedding
import metaxy as mx
# Audio samples: 2 speakers with 2 recordings each
AUDIO_SAMPLES = pl.DataFrame(
[
{
"audio_id": "a1",
"speaker_id": "s1",
"duration_seconds": 30.5,
"path": "audio/s1_recording1.wav",
"metaxy_provenance_by_field": {"default": "a1_v1"},
},
{
"audio_id": "a2",
"speaker_id": "s1",
"duration_seconds": 45.2,
"path": "audio/s1_recording2.wav",
"metaxy_provenance_by_field": {"default": "a2_v1"},
},
{
"audio_id": "a3",
"speaker_id": "s2",
"duration_seconds": 60.0,
"path": "audio/s2_recording1.wav",
"metaxy_provenance_by_field": {"default": "a3_v1"},
},
{
"audio_id": "a4",
"speaker_id": "s2",
"duration_seconds": 35.8,
"path": "audio/s2_recording2.wav",
"metaxy_provenance_by_field": {"default": "a4_v1"},
},
]
)
def main():
cfg = mx.init_metaxy()
store = cfg.get_store("dev")
# Step 1: Write audio metadata
with store:
diff = store.resolve_update(Audio, samples=AUDIO_SAMPLES)
if len(diff.added) > 0:
print(f"Found {len(diff.added)} new audio recordings")
store.write_metadata(Audio, diff.added)
elif len(diff.changed) > 0:
print(f"Found {len(diff.changed)} changed audio recordings")
store.write_metadata(Audio, diff.changed)
else:
print("No new or changed audio recordings")
# Step 2: Compute speaker embeddings
with store:
diff = store.resolve_update(SpeakerEmbedding)
added_df = diff.added.to_polars()
changed_df = diff.changed.to_polars()
speakers_to_process = (
pl.concat([added_df, changed_df])
.select("speaker_id")
.unique()
.to_series()
.to_list()
)
print(
f"Found {len(speakers_to_process)} speakers that need embedding computation"
)
if speakers_to_process:
embedding_data = []
for speaker_id in speakers_to_process:
speaker_rows = pl.concat([added_df, changed_df]).filter(
pl.col("speaker_id") == speaker_id
)
provenance_by_field = speaker_rows["metaxy_provenance_by_field"][0]
provenance = speaker_rows["metaxy_provenance"][0]
n_audio = len(speaker_rows)
print(
f" Computing embedding for speaker {speaker_id} from {n_audio} audio recordings"
)
embedding_data.append(
{
"speaker_id": speaker_id,
"n_dim": 512,
"path": f"embeddings/{speaker_id}.npy",
"metaxy_provenance_by_field": provenance_by_field,
"metaxy_provenance": provenance,
}
)
embedding_df = pl.DataFrame(embedding_data)
print(f"Writing embeddings for {len(embedding_data)} speakers")
store.write_metadata(SpeakerEmbedding, embedding_df)
if __name__ == "__main__":
main()
Step 1: Initial Run¶
Run the pipeline to create audio recordings and speaker embeddings:
Found 4 new audio recordings
Found 2 speakers that need embedding computation
Computing embedding for speaker s1 from 2 audio recordings
Computing embedding for speaker s2 from 2 audio recordings
Writing embeddings for 2 speakers
All features have been materialized:
- 4 audio recordings (2 per speaker)
- 2 speaker embeddings (one per speaker)
Step 2: Verify Idempotency¶
Run the pipeline again without any changes:
Nothing needs recomputation - the system correctly detects no changes.
Step 3: Update One Audio Recording¶
Now let's update the provenance of audio a1 (belonging to speaker s1):
patches/01_update_audio_provenance.patch
--- a/pipeline.py
+++ b/pipeline.py
@@ -22,7 +22,7 @@ AUDIO_SAMPLES = pl.DataFrame(
"speaker_id": "s1",
"duration_seconds": 30.5,
"path": "audio/s1_recording1.wav",
- "metaxy_provenance_by_field": {"default": "a1_v1"},
+ "metaxy_provenance_by_field": {"default": "a1_v2"},
},
{
"audio_id": "a2",
This represents a change to one audio recording (perhaps it was re-processed or updated).
Step 4: Observe Selective Recomputation¶
Run the pipeline again after the audio change:
Found 1 changed audio recordings
Found 1 speakers that need embedding computation
Computing embedding for speaker s1 from 2 audio recordings
Writing embeddings for 1 speakers
Key observation:
- Only speaker
s1's embedding is recomputed (because audioa1belongs tos1) - Speaker
s2's embedding is not recomputed (none of their audio changed)
This demonstrates that Metaxy correctly tracks aggregation lineage - when any audio for a speaker changes, only that speaker's embedding needs recomputation.
Step 5: Add New Audio¶
Now let's add a new audio recording for speaker s1:
patches/02_add_audio.patch
--- a/pipeline.py
+++ b/pipeline.py
@@ -45,6 +45,13 @@
"path": "audio/s2_recording2.wav",
"metaxy_provenance_by_field": {"default": "a4_v1"},
},
+ {
+ "audio_id": "a5",
+ "speaker_id": "s1",
+ "duration_seconds": 25.0,
+ "path": "audio/s1_recording3.wav",
+ "metaxy_provenance_by_field": {"default": "a5_v1"},
+ },
]
)
Step 6: Observe Aggregation Update¶
Run the pipeline again:
Found 1 new audio recordings
Found 1 speakers that need embedding computation
Computing embedding for speaker s1 from 3 audio recordings
Writing embeddings for 1 speakers
Key observation:
- Only speaker
s1's embedding is recomputed (the new audio belongs tos1) - Speaker
s1now has 3 audio recordings (up from 2) - Speaker
s2remains unchanged
How It Works¶
Metaxy uses window functions to compute aggregated provenance without reducing rows. When resolving updates for SpeakerEmbedding:
- All audio rows for the same speaker get identical aggregated provenance values
- The aggregated provenance is computed from the individual audio provenances
- When any audio for a speaker changes, the aggregated provenance changes
- This triggers recomputation of only the affected speaker's embedding
The user's pipeline code performs the actual aggregation (grouping by speaker_id). Metaxy only tracks the provenance and determines what needs recomputation.
Conclusion¶
Metaxy provides a convenient API for modeling aggregation relationships: LineageRelationship.aggregation. Other Metaxy features continue to seamlessly work with aggregation relationships.
Related Materials¶
Learn more about:
- Features and Fields
- Relationships
- One-to-Many Expansion (the inverse relationship)