Skip to content

Aggregation

Overview

View Example Source on GitHub

This example demonstrates how to implement aggregation (N:1) relationships with Metaxy. In such relationships multiple parent samples produce a single child sample.

These relationships can be modeled with LineageRelationship.aggregation lineage type.

We will use a speaker embedding pipeline as an example, where multiple audio recordings from the same speaker are aggregated to compute a single speaker embedding.

The Pipeline

Let's define a pipeline with two features:

---
title: Feature Graph
---
flowchart TB
    %% Snapshot version: none
    %%{init: {'flowchart': {'htmlLabels': true, 'curve': 'basis'}, 'themeVariables': {'fontSize': '14px'}}}%%
    audio["<div style="text-align:left"><b>audio</b><br/>c9762705<br/><font color="#999">---</font><br/>- default (80200592)</div>"]
    speaker_embedding["<div style="text-align:left"><b>speaker/embedding</b><br/>83296faf<br/><font color="#999">---</font><br/>- embedding (8b5b20fb)</div>"]
    audio --> speaker_embedding

Defining features: Audio

Each audio recording has an audio_id (unique identifier) and a speaker_id (which speaker it belongs to). Multiple audio recordings can belong to the same speaker.

src/example_aggregation/features.py
import metaxy as mx


class Audio(
    mx.BaseFeature,
    spec=mx.FeatureSpec(
        key="audio",
        id_columns=["audio_id"],
        fields=["default"],
    ),
):
    """Audio recordings of different speakers."""

    audio_id: str
    speaker_id: str
    duration_seconds: float
    path: str

Defining features: SpeakerEmbedding

SpeakerEmbedding aggregates all audio recordings from a speaker into a single embedding. The key configuration is the lineage parameter which tells Metaxy that multiple Audio records with the same speaker_id are aggregated into one SpeakerEmbedding.

src/example_aggregation/features.py
class SpeakerEmbedding(
    mx.BaseFeature,
    spec=mx.FeatureSpec(
        key="speaker/embedding",
        id_columns=["speaker_id"],
        deps=[
            mx.FeatureDep(
                feature=Audio,
                lineage=mx.LineageRelationship.aggregation(on=["speaker_id"]),
            )
        ],
        fields=[
            mx.FieldSpec(key="embedding", code_version="1"),
        ],
    ),
):
    """Speaker embedding aggregated from all their audio recordings.

    This demonstrates N:1 aggregation lineage where multiple audio recordings
    from the same speaker are aggregated into a single speaker embedding.
    """

    speaker_id: str
    n_dim: int
    path: str

The LineageRelationship.aggregation(on=["speaker_id"]) declaration is the key part. It tells Metaxy:

  1. Multiple Audio rows are aggregated into one SpeakerEmbedding row
  2. The aggregation is keyed on speaker_id - all audio with the same speaker_id contributes to one embedding
  3. When any audio for a speaker changes, the aggregated provenance changes, triggering recomputation of that speaker's embedding

Walkthrough

Here is the pipeline code that processes audio and computes speaker embeddings:

pipeline.py
pipeline.py
"""Pipeline demonstrating N:1 aggregation lineage.

This example shows how Metaxy handles aggregation lineage where multiple
upstream records (audio recordings) are aggregated into a single downstream
record (speaker embedding).

Key concepts:
- Multiple audio recordings per speaker are aggregated into one embedding
- When any audio for a speaker changes, the speaker embedding needs recomputation
"""

import polars as pl
from example_aggregation.features import Audio, SpeakerEmbedding

import metaxy as mx

# Audio samples: 2 speakers with 2 recordings each
AUDIO_SAMPLES = pl.DataFrame(
    [
        {
            "audio_id": "a1",
            "speaker_id": "s1",
            "duration_seconds": 30.5,
            "path": "audio/s1_recording1.wav",
            "metaxy_provenance_by_field": {"default": "a1_v1"},
        },
        {
            "audio_id": "a2",
            "speaker_id": "s1",
            "duration_seconds": 45.2,
            "path": "audio/s1_recording2.wav",
            "metaxy_provenance_by_field": {"default": "a2_v1"},
        },
        {
            "audio_id": "a3",
            "speaker_id": "s2",
            "duration_seconds": 60.0,
            "path": "audio/s2_recording1.wav",
            "metaxy_provenance_by_field": {"default": "a3_v1"},
        },
        {
            "audio_id": "a4",
            "speaker_id": "s2",
            "duration_seconds": 35.8,
            "path": "audio/s2_recording2.wav",
            "metaxy_provenance_by_field": {"default": "a4_v1"},
        },
    ]
)


def main():
    cfg = mx.init_metaxy()
    store = cfg.get_store("dev")

    # Step 1: Write audio metadata
    with store:
        diff = store.resolve_update(Audio, samples=AUDIO_SAMPLES)
        if len(diff.added) > 0:
            print(f"Found {len(diff.added)} new audio recordings")
            store.write_metadata(Audio, diff.added)
        elif len(diff.changed) > 0:
            print(f"Found {len(diff.changed)} changed audio recordings")
            store.write_metadata(Audio, diff.changed)
        else:
            print("No new or changed audio recordings")

    # Step 2: Compute speaker embeddings
    with store:
        diff = store.resolve_update(SpeakerEmbedding)

        added_df = diff.added.to_polars()
        changed_df = diff.changed.to_polars()

        speakers_to_process = (
            pl.concat([added_df, changed_df])
            .select("speaker_id")
            .unique()
            .to_series()
            .to_list()
        )

        print(
            f"Found {len(speakers_to_process)} speakers that need embedding computation"
        )

        if speakers_to_process:
            embedding_data = []

            for speaker_id in speakers_to_process:
                speaker_rows = pl.concat([added_df, changed_df]).filter(
                    pl.col("speaker_id") == speaker_id
                )

                provenance_by_field = speaker_rows["metaxy_provenance_by_field"][0]
                provenance = speaker_rows["metaxy_provenance"][0]

                n_audio = len(speaker_rows)
                print(
                    f"  Computing embedding for speaker {speaker_id} from {n_audio} audio recordings"
                )

                embedding_data.append(
                    {
                        "speaker_id": speaker_id,
                        "n_dim": 512,
                        "path": f"embeddings/{speaker_id}.npy",
                        "metaxy_provenance_by_field": provenance_by_field,
                        "metaxy_provenance": provenance,
                    }
                )

            embedding_df = pl.DataFrame(embedding_data)
            print(f"Writing embeddings for {len(embedding_data)} speakers")
            store.write_metadata(SpeakerEmbedding, embedding_df)


if __name__ == "__main__":
    main()

Step 1: Initial Run

Run the pipeline to create audio recordings and speaker embeddings:

$ python pipeline.py
Found 4 new audio recordings
Found 2 speakers that need embedding computation
  Computing embedding for speaker s1 from 2 audio recordings
  Computing embedding for speaker s2 from 2 audio recordings
Writing embeddings for 2 speakers

All features have been materialized:

  • 4 audio recordings (2 per speaker)
  • 2 speaker embeddings (one per speaker)

Step 2: Verify Idempotency

Run the pipeline again without any changes:

$ python pipeline.py
No new or changed audio recordings
Found 0 speakers that need embedding computation

Nothing needs recomputation - the system correctly detects no changes.

Step 3: Update One Audio Recording

Now let's update the provenance of audio a1 (belonging to speaker s1):

patches/01_update_audio_provenance.patch
patches/01_update_audio_provenance.patch
--- a/pipeline.py
+++ b/pipeline.py
@@ -22,7 +22,7 @@ AUDIO_SAMPLES = pl.DataFrame(
             "speaker_id": "s1",
             "duration_seconds": 30.5,
             "path": "audio/s1_recording1.wav",
-            "metaxy_provenance_by_field": {"default": "a1_v1"},
+            "metaxy_provenance_by_field": {"default": "a1_v2"},
         },
         {
             "audio_id": "a2",

This represents a change to one audio recording (perhaps it was re-processed or updated).

Step 4: Observe Selective Recomputation

Run the pipeline again after the audio change:

$ python pipeline.py
Found 1 changed audio recordings
Found 1 speakers that need embedding computation
  Computing embedding for speaker s1 from 2 audio recordings
Writing embeddings for 1 speakers

Key observation:

  • Only speaker s1's embedding is recomputed (because audio a1 belongs to s1)
  • Speaker s2's embedding is not recomputed (none of their audio changed)

This demonstrates that Metaxy correctly tracks aggregation lineage - when any audio for a speaker changes, only that speaker's embedding needs recomputation.

Step 5: Add New Audio

Now let's add a new audio recording for speaker s1:

patches/02_add_audio.patch
patches/02_add_audio.patch
--- a/pipeline.py
+++ b/pipeline.py
@@ -45,6 +45,13 @@
             "path": "audio/s2_recording2.wav",
             "metaxy_provenance_by_field": {"default": "a4_v1"},
         },
+        {
+            "audio_id": "a5",
+            "speaker_id": "s1",
+            "duration_seconds": 25.0,
+            "path": "audio/s1_recording3.wav",
+            "metaxy_provenance_by_field": {"default": "a5_v1"},
+        },
     ]
 )

Step 6: Observe Aggregation Update

Run the pipeline again:

$ python pipeline.py
Found 1 new audio recordings
Found 1 speakers that need embedding computation
  Computing embedding for speaker s1 from 3 audio recordings
Writing embeddings for 1 speakers

Key observation:

  • Only speaker s1's embedding is recomputed (the new audio belongs to s1)
  • Speaker s1 now has 3 audio recordings (up from 2)
  • Speaker s2 remains unchanged

How It Works

Metaxy uses window functions to compute aggregated provenance without reducing rows. When resolving updates for SpeakerEmbedding:

  1. All audio rows for the same speaker get identical aggregated provenance values
  2. The aggregated provenance is computed from the individual audio provenances
  3. When any audio for a speaker changes, the aggregated provenance changes
  4. This triggers recomputation of only the affected speaker's embedding

The user's pipeline code performs the actual aggregation (grouping by speaker_id). Metaxy only tracks the provenance and determines what needs recomputation.

Conclusion

Metaxy provides a convenient API for modeling aggregation relationships: LineageRelationship.aggregation. Other Metaxy features continue to seamlessly work with aggregation relationships.

Learn more about: