Skip to content

One-to-Many Expansion

View Example Source on GitHub

This example demonstrates how to implement 1:N transformations with Metaxy. In such relationships a single parent sample can map into multiple child samples.

In Metaxy they can be modeled with LineageRelationship.expansion lineage type.

We will use a hypothetical video chunking pipeline as an example. We are also going to demonstrate that other Metaxy features such as fields mapping work with non-standard lineage types.

The Pipeline

We are going to define a typical video processing pipeline with three features:

---
title: Feature Graph
---
flowchart TB
    %%{init: {'flowchart': {'htmlLabels': true, 'curve': 'basis'}, 'themeVariables': {'fontSize': '14px'}}}%%
        video_raw["<div style="text-align:left"><b>video/raw</b><br/><font color="#999">---</font><br/>• audio<br/>• frames</div>"]
        video_chunk["<div style="text-align:left"><b>video/chunk</b><br/><font color="#999">---</font><br/>• audio<br/>• frames</div>"]
        video_faces["<div style="text-align:left"><b>video/faces</b><br/><font color="#999">---</font><br/>• faces</div>"]
        video_raw --> video_chunk
        video_chunk --> video_faces

Defining features: Video

Each video-like feature in our pipeline is going to have two fields: audio and frames.

Let's set the code version of audio to "1" in order to change it in the future. frames field will have a default version.

src/example_one_to_many/features.py
import metaxy as mx


class Video(
    mx.BaseFeature,
    spec=mx.FeatureSpec(
        key="video/raw",
        id_columns=["video_id"],
        fields=[
            mx.FieldSpec(key="audio", code_version="1"),
            "frames",
        ],
    ),
):
    video_id: str
    path: str  # where the video is stored

Defining features: VideoChunk

VideoChunk represents a piece of the upstream Video feature. Since each Video sample can be split into multiple chunks, we need to tell Metaxy how to map each chunk to its parent video.

src/example_one_to_many/features.py
class VideoChunk(
    mx.BaseFeature,
    spec=mx.FeatureSpec(
        key=["video", "chunk"],
        id_columns=["video_chunk_id"],
        deps=[mx.FeatureDep(feature=Video)],
        fields=["audio", "frames"],
        lineage=mx.LineageRelationship.expansion(on=["video_id"]),
    ),
):
    video_id: str  # points to the parent video
    video_chunk_id: str
    path: str  # where the video chunk is stored

We do not specify custom versions on its fields. Metaxy will automatically assign field-level dependencies by matching on field names: VideoChunk.frames depends on Video.frames and VideoChunk.audio depends on Video.audio.

Defining features: FaceRecognition

FaceRecognition processes video chunks and only depends on the frames field. This can be expressed with a SpecificFieldsMapping.

src/example_one_to_many/features.py
class FaceRecognition(
    mx.BaseFeature,
    spec=mx.FeatureSpec(
        key=["video", "faces"],
        id_columns=["video_chunk_id"],
        deps=[
            mx.FeatureDep(
                feature=VideoChunk,
                fields_mapping=mx.FieldsMapping.specific(
                    mapping={mx.FieldKey("faces"): {mx.FieldKey("frames")}}
                ),
            )
        ],
        fields=["faces"],
    ),
):
    video_chunk_id: str
    num_faces: int  # number of faces detected

This completes the feature definitions. Let's proceed to running the pipeline.

Walkthrough

Here is a toy pipeline for computing the feature graph described above:

pipeline.py
pipeline.py
import random

import narwhals as nw
import polars as pl
from example_one_to_many.features import FaceRecognition, Video, VideoChunk
from example_one_to_many.utils import split_video_into_chunks

from metaxy import init_metaxy


def main():
    cfg = init_metaxy()
    store = cfg.get_store("dev")

    # let's pretend somebody has already created the videos for us
    samples = pl.DataFrame(
        {
            "video_id": [1, 2, 3],
            "path": ["video1.mp4", "video2.mp4", "video3.mp4"],
            "metaxy_provenance_by_field": [
                {"audio": "v1", "frames": "v1"},
                {"audio": "v2", "frames": "v2"},
                {"audio": "v3", "frames": "v3"},
            ],
        }
    )

    with store:
        # showcase: resolve incremental update for a root feature
        diff = store.resolve_update(Video, samples=nw.from_native(samples))
        if len(diff.added) > 0:
            print(f"Found {len(diff.added)} new videos")
            store.write_metadata(Video, diff.added)

    # now we are going to resolve the videos that have to be split to chunks
    with store:
        diff = store.resolve_update(VideoChunk)
        # the DataFrame dimensions matches Video (with ID column renamed)

        print(
            f"Found {len(diff.added)} videos and {len(diff.changed)} videos that need chunking"
        )

        for row_dict in pl.concat(
            [diff.added.to_polars(), diff.changed.to_polars()]
        ).iter_rows(named=True):
            print(f"Processing video: {row_dict}")
            # let's split each video to 3-5 chunks randomly

            video_id = row_dict["video_id"]
            path = row_dict["path"]

            provenance_by_field = row_dict["metaxy_provenance_by_field"]
            provenance = row_dict["metaxy_provenance"]

            # pretend we split the video into chunks
            chunk_paths = split_video_into_chunks(path)

            # Generate chunk IDs based on the parent video ID
            chunk_ids = [f"{video_id}_{i}" for i in range(len(chunk_paths))]

            # write the chunks to the store
            # CRUSIAL: all the chunks **must share the same provenance values**
            chunk_df = pl.DataFrame(
                {
                    "video_id": [video_id] * len(chunk_paths),
                    "video_chunk_id": chunk_ids,
                    "path": chunk_paths,
                    "metaxy_provenance_by_field": [provenance_by_field]
                    * len(chunk_paths),
                    "metaxy_provenance": [provenance] * len(chunk_paths),
                }
            )
            print(f"Writing {len(chunk_paths)} chunks for video {video_id}")
            store.write_metadata(VideoChunk, nw.from_native(chunk_df))

    # now process face recognition on video chunks
    with store:
        diff = store.resolve_update(FaceRecognition)
        print(
            f"Found {len(diff.added)} video chunks and {len(diff.changed)} video chunks that need face recognition"
        )

        if len(diff.added) > 0:
            # simulate face detection on each chunk
            face_data = []
            for row_dict in pl.concat(
                [diff.added.to_polars(), diff.changed.to_polars()]
            ).iter_rows(named=True):
                video_chunk_id = row_dict["video_chunk_id"]
                provenance_by_field = row_dict["metaxy_provenance_by_field"]
                provenance = row_dict["metaxy_provenance"]

                # simulate detecting random number of faces
                num_faces = random.randint(0, 10)

                face_data.append(
                    {
                        "video_chunk_id": video_chunk_id,
                        "num_faces": num_faces,
                        "metaxy_provenance_by_field": provenance_by_field,
                        "metaxy_provenance": provenance,
                    }
                )

            face_df = pl.DataFrame(face_data)
            print(f"Writing face recognition results for {len(face_data)} chunks")
            store.write_metadata(FaceRecognition, nw.from_native(face_df))


if __name__ == "__main__":
    main()

Step 1: Launch Initial Run

Run the pipeline to create videos, chunks, and face recognition results:

python pipeline.py

Output:

Found 3 new videos
Found 3 videos and 0 videos that need chunking
Processing video: {'video_id': 1, ...}
Writing 4 chunks for video 1
Processing video: {'video_id': 2, ...}
Writing 3 chunks for video 2
Processing video: {'video_id': 3, ...}
Writing 5 chunks for video 3
Found 12 video chunks and 0 video chunks that need face recognition
Writing face recognition results for 12 chunks

All three features have been materialized. Note that the VideoChunk feature may dynamically create as many samples as needed: Metaxy doesn't need to know anything about this in advance, except the relationship type.

Step 2: Verify Idempotency

Run the pipeline again without any changes:

python pipeline.py

Output:

Found 0 videos and 0 videos that need chunking
Found 0 video chunks and 0 video chunks that need face recognition

Nothing needs recomputation - the system correctly detects no changes.

Step 3: Change Audio Code Version

Now let's bump the code version on the audio field of Video feature:

patches/01_update_video_code_version.patch
patches/01_update_video_code_version.patch
--- a/src/example_one_to_many/features.py
+++ b/src/example_one_to_many/features.py
@@ -9,6 +9,6 @@ class Video(
         id_columns=["video_id"],
         fields=[
-            mx.FieldSpec(key="audio", code_version="1"),
+            mx.FieldSpec(key="audio", code_version="2"),
             "frames",
         ],
     ),

This represents updating the audio processing algorithm, and therefore the audio data.

Step 4: Observe Field-Level Tracking

Run the pipeline again after the code change:

python pipeline.py

Output:

Found 3 new videos
Found 3 videos and 0 videos that need chunking
Processing video: {'video_id': 1, ...}
Writing 3 chunks for video 1
Processing video: {'video_id': 2, ...}
Writing 5 chunks for video 2
Processing video: {'video_id': 3, ...}
Writing 4 chunks for video 3
Found 0 video chunks and 0 video chunks that need face recognition

Key observation:

  • VideoChunk has been recomputed since the audio field on it has been affected by the upstream change
  • FaceRecognition did not require a recompute, because it only depends on the frames field (which did not change)

Conclusion

Metaxy provides a convenient API for modeling 1:N relationships: LineageRelationship.expansion. Other Metaxy features such as field-level versioning continue to work seamlessly when declaring 1:N relationships.

Learn more about: