One-to-Many Expansion¶
This example demonstrates how to implement 1:N transformations with Metaxy.
In such relationships a single parent sample can map into multiple child samples.
In Metaxy they can be modeled with LineageRelationship.expansion lineage type.
We will use a hypothetical video chunking pipeline as an example. We are also going to demonstrate that other Metaxy features such as fields mapping work with non-standard lineage types.
The Pipeline¶
We are going to define a typical video processing pipeline with three features:
---
title: Feature Graph
---
flowchart TB
%%{init: {'flowchart': {'htmlLabels': true, 'curve': 'basis'}, 'themeVariables': {'fontSize': '14px'}}}%%
video_raw["<div style="text-align:left"><b>video/raw</b><br/><font color="#999">---</font><br/>• audio<br/>• frames</div>"]
video_chunk["<div style="text-align:left"><b>video/chunk</b><br/><font color="#999">---</font><br/>• audio<br/>• frames</div>"]
video_faces["<div style="text-align:left"><b>video/faces</b><br/><font color="#999">---</font><br/>• faces</div>"]
video_raw --> video_chunk
video_chunk --> video_faces
Defining features: Video¶
Each video-like feature in our pipeline is going to have two fields: audio and frames.
Let's set the code version of audio to "1" in order to change it in the future.
frames field will have a default version.
import metaxy as mx
class Video(
mx.BaseFeature,
spec=mx.FeatureSpec(
key="video/raw",
id_columns=["video_id"],
fields=[
mx.FieldSpec(key="audio", code_version="1"),
"frames",
],
),
):
video_id: str
path: str # where the video is stored
Defining features: VideoChunk¶
VideoChunk represents a piece of the upstream Video feature. Since each Video sample can be split into multiple chunks, we need to tell Metaxy how to map each chunk to its parent video.
class VideoChunk(
mx.BaseFeature,
spec=mx.FeatureSpec(
key=["video", "chunk"],
id_columns=["video_chunk_id"],
deps=[mx.FeatureDep(feature=Video)],
fields=["audio", "frames"],
lineage=mx.LineageRelationship.expansion(on=["video_id"]),
),
):
video_id: str # points to the parent video
video_chunk_id: str
path: str # where the video chunk is stored
We do not specify custom versions on its fields. Metaxy will automatically assign field-level dependencies by matching on field names: VideoChunk.frames depends on Video.frames and VideoChunk.audio depends on Video.audio.
Defining features: FaceRecognition¶
FaceRecognition processes video chunks and only depends on the frames field. This can be expressed with a SpecificFieldsMapping.
class FaceRecognition(
mx.BaseFeature,
spec=mx.FeatureSpec(
key=["video", "faces"],
id_columns=["video_chunk_id"],
deps=[
mx.FeatureDep(
feature=VideoChunk,
fields_mapping=mx.FieldsMapping.specific(
mapping={mx.FieldKey("faces"): {mx.FieldKey("frames")}}
),
)
],
fields=["faces"],
),
):
video_chunk_id: str
num_faces: int # number of faces detected
This completes the feature definitions. Let's proceed to running the pipeline.
Walkthrough¶
Here is a toy pipeline for computing the feature graph described above:
pipeline.py
import random
import narwhals as nw
import polars as pl
from example_one_to_many.features import FaceRecognition, Video, VideoChunk
from example_one_to_many.utils import split_video_into_chunks
from metaxy import init_metaxy
def main():
cfg = init_metaxy()
store = cfg.get_store("dev")
# let's pretend somebody has already created the videos for us
samples = pl.DataFrame(
{
"video_id": [1, 2, 3],
"path": ["video1.mp4", "video2.mp4", "video3.mp4"],
"metaxy_provenance_by_field": [
{"audio": "v1", "frames": "v1"},
{"audio": "v2", "frames": "v2"},
{"audio": "v3", "frames": "v3"},
],
}
)
with store:
# showcase: resolve incremental update for a root feature
diff = store.resolve_update(Video, samples=nw.from_native(samples))
if len(diff.added) > 0:
print(f"Found {len(diff.added)} new videos")
store.write_metadata(Video, diff.added)
# now we are going to resolve the videos that have to be split to chunks
with store:
diff = store.resolve_update(VideoChunk)
# the DataFrame dimensions matches Video (with ID column renamed)
print(
f"Found {len(diff.added)} videos and {len(diff.changed)} videos that need chunking"
)
for row_dict in pl.concat(
[diff.added.to_polars(), diff.changed.to_polars()]
).iter_rows(named=True):
print(f"Processing video: {row_dict}")
# let's split each video to 3-5 chunks randomly
video_id = row_dict["video_id"]
path = row_dict["path"]
provenance_by_field = row_dict["metaxy_provenance_by_field"]
provenance = row_dict["metaxy_provenance"]
# pretend we split the video into chunks
chunk_paths = split_video_into_chunks(path)
# Generate chunk IDs based on the parent video ID
chunk_ids = [f"{video_id}_{i}" for i in range(len(chunk_paths))]
# write the chunks to the store
# CRUSIAL: all the chunks **must share the same provenance values**
chunk_df = pl.DataFrame(
{
"video_id": [video_id] * len(chunk_paths),
"video_chunk_id": chunk_ids,
"path": chunk_paths,
"metaxy_provenance_by_field": [provenance_by_field]
* len(chunk_paths),
"metaxy_provenance": [provenance] * len(chunk_paths),
}
)
print(f"Writing {len(chunk_paths)} chunks for video {video_id}")
store.write_metadata(VideoChunk, nw.from_native(chunk_df))
# now process face recognition on video chunks
with store:
diff = store.resolve_update(FaceRecognition)
print(
f"Found {len(diff.added)} video chunks and {len(diff.changed)} video chunks that need face recognition"
)
if len(diff.added) > 0:
# simulate face detection on each chunk
face_data = []
for row_dict in pl.concat(
[diff.added.to_polars(), diff.changed.to_polars()]
).iter_rows(named=True):
video_chunk_id = row_dict["video_chunk_id"]
provenance_by_field = row_dict["metaxy_provenance_by_field"]
provenance = row_dict["metaxy_provenance"]
# simulate detecting random number of faces
num_faces = random.randint(0, 10)
face_data.append(
{
"video_chunk_id": video_chunk_id,
"num_faces": num_faces,
"metaxy_provenance_by_field": provenance_by_field,
"metaxy_provenance": provenance,
}
)
face_df = pl.DataFrame(face_data)
print(f"Writing face recognition results for {len(face_data)} chunks")
store.write_metadata(FaceRecognition, nw.from_native(face_df))
if __name__ == "__main__":
main()
Step 1: Launch Initial Run¶
Run the pipeline to create videos, chunks, and face recognition results:
Output:
Found 3 new videos
Found 3 videos and 0 videos that need chunking
Processing video: {'video_id': 1, ...}
Writing 4 chunks for video 1
Processing video: {'video_id': 2, ...}
Writing 3 chunks for video 2
Processing video: {'video_id': 3, ...}
Writing 5 chunks for video 3
Found 12 video chunks and 0 video chunks that need face recognition
Writing face recognition results for 12 chunks
All three features have been materialized. Note that the VideoChunk feature may dynamically create as many samples as needed: Metaxy doesn't need to know anything about this in advance, except the relationship type.
Step 2: Verify Idempotency¶
Run the pipeline again without any changes:
Output:
Found 0 videos and 0 videos that need chunking
Found 0 video chunks and 0 video chunks that need face recognition
Nothing needs recomputation - the system correctly detects no changes.
Step 3: Change Audio Code Version¶
Now let's bump the code version on the audio field of Video feature:
patches/01_update_video_code_version.patch
This represents updating the audio processing algorithm, and therefore the audio data.
Step 4: Observe Field-Level Tracking¶
Run the pipeline again after the code change:
Output:
Found 3 new videos
Found 3 videos and 0 videos that need chunking
Processing video: {'video_id': 1, ...}
Writing 3 chunks for video 1
Processing video: {'video_id': 2, ...}
Writing 5 chunks for video 2
Processing video: {'video_id': 3, ...}
Writing 4 chunks for video 3
Found 0 video chunks and 0 video chunks that need face recognition
Key observation:
VideoChunkhas been recomputed since theaudiofield on it has been affected by the upstream changeFaceRecognitiondid not require a recompute, because it only depends on theframesfield (which did not change)
Conclusion¶
Metaxy provides a convenient API for modeling 1:N relationships: LineageRelationship.expansion. Other Metaxy features such as field-level versioning continue to work seamlessly when declaring 1:N relationships.
Related materials¶
Learn more about: