Skip to content

Feature Graph

FeatureGraph is a global "God" object that holds all the features loaded by Metaxy via the feature discovery mechanism.

Users may interact with FeatureGraph when writing custom migrations, otherwise they are not exposed to it.

FeatureGraph

FeatureGraph()
Source code in src/metaxy/models/feature.py
def __init__(self):
    self.features_by_key: dict[FeatureKey, type[BaseFeature]] = {}
    self.feature_specs_by_key: dict[FeatureKey, FeatureSpecWithIDColumns] = {}

Attributes

snapshot_version property

snapshot_version: str

Generate a snapshot version representing the current topology + versions of the feature graph

Functions

add_feature

add_feature(feature: type[BaseFeature]) -> None

Add a feature to the graph.

Parameters:

Raises:

  • ValueError

    If a feature with the same key is already registered or if duplicate column names would result from renaming operations

Source code in src/metaxy/models/feature.py
def add_feature(self, feature: type["BaseFeature"]) -> None:
    """Add a feature to the graph.

    Args:
        feature: Feature class to register

    Raises:
        ValueError: If a feature with the same key is already registered
                   or if duplicate column names would result from renaming operations
    """
    if feature.spec().key in self.features_by_key:
        existing = self.features_by_key[feature.spec().key]
        raise ValueError(
            f"Feature with key {feature.spec().key.to_string()} already registered. "
            f"Existing: {existing.__name__}, New: {feature.__name__}. "
            f"Each feature key must be unique within a graph."
        )

    # Validate that there are no duplicate column names across dependencies after renaming
    if feature.spec().deps:
        self._validate_no_duplicate_columns(feature.spec())

    self.features_by_key[feature.spec().key] = feature
    self.feature_specs_by_key[feature.spec().key] = feature.spec()

remove_feature

remove_feature(key: FeatureKey) -> None

Remove a feature from the graph.

Parameters:

Raises:

  • KeyError

    If no feature with the given key is registered

Source code in src/metaxy/models/feature.py
def remove_feature(self, key: FeatureKey) -> None:
    """Remove a feature from the graph.

    Args:
        key: Feature key to remove

    Raises:
        KeyError: If no feature with the given key is registered
    """
    if key not in self.features_by_key:
        raise KeyError(
            f"No feature with key {key.to_string()} found in graph. "
            f"Available keys: {[k.to_string() for k in self.features_by_key.keys()]}"
        )

    del self.features_by_key[key]
    del self.feature_specs_by_key[key]

get_feature_by_key

get_feature_by_key(key: FeatureKey) -> type[BaseFeature]

Get a feature class by its key.

Parameters:

Returns:

Raises:

  • KeyError

    If no feature with the given key is registered

Example
graph = FeatureGraph.get_active()
parent_key = FeatureKey(["examples", "parent"])
ParentFeature = graph.get_feature_by_key(parent_key)
Source code in src/metaxy/models/feature.py
def get_feature_by_key(self, key: FeatureKey) -> type["BaseFeature"]:
    """Get a feature class by its key.

    Args:
        key: Feature key to look up

    Returns:
        Feature class

    Raises:
        KeyError: If no feature with the given key is registered

    Example:
        ```py
        graph = FeatureGraph.get_active()
        parent_key = FeatureKey(["examples", "parent"])
        ParentFeature = graph.get_feature_by_key(parent_key)
        ```
    """
    if key not in self.features_by_key:
        raise KeyError(
            f"No feature with key {key.to_string()} found in graph. "
            f"Available keys: {[k.to_string() for k in self.features_by_key.keys()]}"
        )
    return self.features_by_key[key]

get_feature_version_by_field

get_feature_version_by_field(key: FeatureKey) -> dict[str, str]

Computes the field provenance map for a feature.

Hash together field provenance entries with the feature code version.

Returns:

  • dict[str, str]

    dict[str, str]: The provenance hash for each field in the feature plan. Keys are field names as strings.

Source code in src/metaxy/models/feature.py
def get_feature_version_by_field(self, key: FeatureKey) -> dict[str, str]:
    """Computes the field provenance map for a feature.

    Hash together field provenance entries with the feature code version.

    Returns:
        dict[str, str]: The provenance hash for each field in the feature plan.
            Keys are field names as strings.
    """
    res = {}

    plan = self.get_feature_plan(key)

    for k, v in plan.feature.fields_by_key.items():
        res[k.to_string()] = self.get_field_version(
            FQFieldKey(field=k, feature=key)
        )

    return res

get_feature_version

get_feature_version(key: FeatureKey) -> str

Computes the feature version as a single string

Source code in src/metaxy/models/feature.py
def get_feature_version(self, key: FeatureKey) -> str:
    """Computes the feature version as a single string"""
    hasher = hashlib.sha256()
    provenance_by_field = self.get_feature_version_by_field(key)
    for field_key in sorted(provenance_by_field):
        hasher.update(field_key.encode())
        hasher.update(provenance_by_field[field_key].encode())

    return truncate_hash(hasher.hexdigest())

get_downstream_features

get_downstream_features(sources: list[FeatureKey]) -> list[FeatureKey]

Get all features downstream of sources, topologically sorted.

Performs a depth-first traversal of the dependency graph to find all features that transitively depend on any of the source features.

Parameters:

Returns:

  • list[FeatureKey]

    List of downstream feature keys in topological order (dependencies first).

  • list[FeatureKey]

    Does not include the source features themselves.

Example
# DAG: A -> B -> D
#      A -> C -> D
graph.get_downstream_features([FeatureKey(["A"])])
# [FeatureKey(["B"]), FeatureKey(["C"]), FeatureKey(["D"])]
Source code in src/metaxy/models/feature.py
def get_downstream_features(self, sources: list[FeatureKey]) -> list[FeatureKey]:
    """Get all features downstream of sources, topologically sorted.

    Performs a depth-first traversal of the dependency graph to find all
    features that transitively depend on any of the source features.

    Args:
        sources: List of source feature keys

    Returns:
        List of downstream feature keys in topological order (dependencies first).
        Does not include the source features themselves.

    Example:
        ```py
        # DAG: A -> B -> D
        #      A -> C -> D
        graph.get_downstream_features([FeatureKey(["A"])])
        # [FeatureKey(["B"]), FeatureKey(["C"]), FeatureKey(["D"])]
        ```
    """
    source_set = set(sources)
    visited = set()
    post_order = []  # Reverse topological order

    def visit(key: FeatureKey):
        """DFS traversal."""
        if key in visited:
            return
        visited.add(key)

        # Find all features that depend on this one
        for feature_key, feature_spec in self.feature_specs_by_key.items():
            if feature_spec.deps:
                for dep in feature_spec.deps:
                    if dep.feature == key:
                        # This feature depends on 'key', so visit it
                        visit(feature_key)

        post_order.append(key)

    # Visit all sources
    for source in sources:
        visit(source)

    # Remove sources from result, reverse to get topological order
    result = [k for k in reversed(post_order) if k not in source_set]
    return result

to_snapshot

to_snapshot() -> dict[str, dict[str, Any]]

Serialize graph to snapshot format.

Returns a dict mapping feature_key (string) to feature data dict, including the import path of the Feature class for reconstruction.

Returns:

  • dict[str, dict[str, Any]]

    Dict of feature_key -> { feature_spec: dict, metaxy_feature_version: str, metaxy_feature_spec_version: str, metaxy_feature_tracking_version: str, feature_class_path: str, project: str

  • dict[str, dict[str, Any]]

    }

Example
snapshot = graph.to_snapshot()
snapshot["video_processing"]["metaxy_feature_version"]
# 'abc12345'
snapshot["video_processing"]["metaxy_feature_spec_version"]
# 'def67890'
snapshot["video_processing"]["metaxy_feature_tracking_version"]
# 'xyz98765'
snapshot["video_processing"]["feature_class_path"]
# 'myapp.features.video.VideoProcessing'
snapshot["video_processing"]["project"]
# 'myapp'
Source code in src/metaxy/models/feature.py
def to_snapshot(self) -> dict[str, dict[str, Any]]:
    """Serialize graph to snapshot format.

    Returns a dict mapping feature_key (string) to feature data dict,
    including the import path of the Feature class for reconstruction.

    Returns:
        Dict of feature_key -> {
            feature_spec: dict,
            metaxy_feature_version: str,
            metaxy_feature_spec_version: str,
            metaxy_feature_tracking_version: str,
            feature_class_path: str,
            project: str
        }

    Example:
        ```py
        snapshot = graph.to_snapshot()
        snapshot["video_processing"]["metaxy_feature_version"]
        # 'abc12345'
        snapshot["video_processing"]["metaxy_feature_spec_version"]
        # 'def67890'
        snapshot["video_processing"]["metaxy_feature_tracking_version"]
        # 'xyz98765'
        snapshot["video_processing"]["feature_class_path"]
        # 'myapp.features.video.VideoProcessing'
        snapshot["video_processing"]["project"]
        # 'myapp'
        ```
    """
    snapshot = {}

    for feature_key, feature_cls in self.features_by_key.items():
        feature_key_str = feature_key.to_string()
        feature_spec_dict = feature_cls.spec().model_dump(mode="json")  # type: ignore[attr-defined]
        feature_version = feature_cls.feature_version()  # type: ignore[attr-defined]
        feature_spec_version = feature_cls.spec().feature_spec_version  # type: ignore[attr-defined]
        feature_tracking_version = feature_cls.feature_tracking_version()  # type: ignore[attr-defined]
        project = feature_cls.project  # type: ignore[attr-defined]

        # Get class import path (module.ClassName)
        class_path = f"{feature_cls.__module__}.{feature_cls.__name__}"

        snapshot[feature_key_str] = {
            "feature_spec": feature_spec_dict,
            FEATURE_VERSION_COL: feature_version,
            FEATURE_SPEC_VERSION_COL: feature_spec_version,
            FEATURE_TRACKING_VERSION_COL: feature_tracking_version,
            "feature_class_path": class_path,
            "project": project,
        }

    return snapshot

from_snapshot classmethod

from_snapshot(snapshot_data: dict[str, dict[str, Any]], *, class_path_overrides: dict[str, str] | None = None, force_reload: bool = False) -> FeatureGraph

Reconstruct graph from snapshot by importing Feature classes.

Strictly requires Feature classes to exist at their recorded import paths. This ensures custom methods (like load_input) are available.

If a feature has been moved/renamed, use class_path_overrides to specify the new location.

Parameters:

  • snapshot_data (dict[str, dict[str, Any]]) –

    Dict of feature_key -> dict containing feature_spec (dict), feature_class_path (str), and other fields as returned by to_snapshot() or loaded from DB

  • class_path_overrides (dict[str, str] | None, default: None ) –

    Optional dict mapping feature_key to new class path for features that have been moved/renamed

  • force_reload (bool, default: False ) –

    If True, reload modules from disk to get current code state.

Returns:

  • FeatureGraph

    New FeatureGraph with historical features

Raises:

  • ImportError

    If feature class cannot be imported at recorded path

Example
# Load snapshot from metadata store
historical_graph = FeatureGraph.from_snapshot(snapshot_data)

# With override for moved feature
historical_graph = FeatureGraph.from_snapshot(
    snapshot_data,
    class_path_overrides={
        "video_processing": "myapp.features_v2.VideoProcessing"
    }
)
Source code in src/metaxy/models/feature.py
@classmethod
def from_snapshot(
    cls,
    snapshot_data: dict[str, dict[str, Any]],
    *,
    class_path_overrides: dict[str, str] | None = None,
    force_reload: bool = False,
) -> "FeatureGraph":
    """Reconstruct graph from snapshot by importing Feature classes.

    Strictly requires Feature classes to exist at their recorded import paths.
    This ensures custom methods (like load_input) are available.

    If a feature has been moved/renamed, use class_path_overrides to specify
    the new location.

    Args:
        snapshot_data: Dict of feature_key -> dict containing
            feature_spec (dict), feature_class_path (str), and other fields
            as returned by to_snapshot() or loaded from DB
        class_path_overrides: Optional dict mapping feature_key to new class path
                             for features that have been moved/renamed
        force_reload: If True, reload modules from disk to get current code state.

    Returns:
        New FeatureGraph with historical features

    Raises:
        ImportError: If feature class cannot be imported at recorded path

    Example:
        ```py
        # Load snapshot from metadata store
        historical_graph = FeatureGraph.from_snapshot(snapshot_data)

        # With override for moved feature
        historical_graph = FeatureGraph.from_snapshot(
            snapshot_data,
            class_path_overrides={
                "video_processing": "myapp.features_v2.VideoProcessing"
            }
        )
        ```
    """
    import importlib
    import sys

    graph = cls()
    class_path_overrides = class_path_overrides or {}

    # If force_reload, collect all module paths first to remove ALL features
    # from those modules before reloading (modules can have multiple features)
    modules_to_reload = set()
    if force_reload:
        for feature_key_str, feature_data in snapshot_data.items():
            class_path = class_path_overrides.get(
                feature_key_str
            ) or feature_data.get("feature_class_path")
            if class_path:
                module_path, _ = class_path.rsplit(".", 1)
                if module_path in sys.modules:
                    modules_to_reload.add(module_path)

    # Use context manager to temporarily set the new graph as active
    # This ensures imported Feature classes register to the new graph, not the current one
    with graph.use():
        for feature_key_str, feature_data in snapshot_data.items():
            # Parse FeatureSpec for validation
            feature_spec_dict = feature_data["feature_spec"]
            FeatureSpec.model_validate(feature_spec_dict)

            # Get class path (check overrides first)
            if feature_key_str in class_path_overrides:
                class_path = class_path_overrides[feature_key_str]
            else:
                class_path = feature_data.get("feature_class_path")
                if not class_path:
                    raise ValueError(
                        f"Feature '{feature_key_str}' has no feature_class_path in snapshot. "
                        f"Cannot reconstruct historical graph."
                    )

            # Import the class
            try:
                module_path, class_name = class_path.rsplit(".", 1)

                # Force reload module from disk if requested
                # This is critical for migration detection - when code changes,
                # we need fresh imports to detect the changes
                if force_reload and module_path in modules_to_reload:
                    # Before first reload of this module, remove ALL features from this module
                    # (a module can define multiple features)
                    if module_path in modules_to_reload:
                        # Find all features from this module in snapshot and remove them
                        for fk_str, fd in snapshot_data.items():
                            fcp = class_path_overrides.get(fk_str) or fd.get(
                                "feature_class_path"
                            )
                            if fcp and fcp.rsplit(".", 1)[0] == module_path:
                                fspec_dict = fd["feature_spec"]
                                fspec = FeatureSpec.model_validate(fspec_dict)
                                if fspec.key in graph.features_by_key:
                                    graph.remove_feature(fspec.key)

                        # Mark module as processed so we don't remove features again
                        modules_to_reload.discard(module_path)

                    module = importlib.reload(sys.modules[module_path])
                else:
                    module = __import__(module_path, fromlist=[class_name])

                feature_cls = getattr(module, class_name)
            except (ImportError, AttributeError) as e:
                raise ImportError(
                    f"Cannot import Feature class '{class_path}' for feature graph reconstruction from snapshot. "
                    f"Feature '{feature_key_str}' is required to reconstruct the graph, but the class "
                    f"cannot be found at the recorded import path. "
                ) from e

            # Validate the imported class matches the stored spec
            if not hasattr(feature_cls, "spec"):
                raise TypeError(
                    f"Imported class '{class_path}' is not a valid Feature class "
                    f"(missing 'spec' attribute)"
                )

            # Register the imported feature to this graph if not already present
            # If the module was imported for the first time, the metaclass already registered it
            # If the module was previously imported, we need to manually register it
            if feature_cls.spec().key not in graph.features_by_key:
                graph.add_feature(feature_cls)

    return graph

get_active classmethod

get_active() -> FeatureGraph

Get the currently active graph.

Returns the graph from the context variable if set, otherwise returns the default global graph.

Returns:

Example
# Normal usage - returns default graph
reg = FeatureGraph.get_active()

# With custom graph in context
with my_graph.use():
    reg = FeatureGraph.get_active()  # Returns my_graph
Source code in src/metaxy/models/feature.py
@classmethod
def get_active(cls) -> "FeatureGraph":
    """Get the currently active graph.

    Returns the graph from the context variable if set, otherwise returns
    the default global graph.

    Returns:
        Active FeatureGraph instance

    Example:
        ```py
        # Normal usage - returns default graph
        reg = FeatureGraph.get_active()

        # With custom graph in context
        with my_graph.use():
            reg = FeatureGraph.get_active()  # Returns my_graph
        ```
    """
    return _active_graph.get() or graph

set_active classmethod

set_active(reg: FeatureGraph) -> None

Set the active graph for the current context.

This sets the context variable that will be returned by get_active(). Typically used in application setup code or test fixtures.

Parameters:

Example
# In application setup
my_graph = FeatureGraph()
FeatureGraph.set_active(my_graph)

# Now all operations use my_graph
FeatureGraph.get_active()  # Returns my_graph
Source code in src/metaxy/models/feature.py
@classmethod
def set_active(cls, reg: "FeatureGraph") -> None:
    """Set the active graph for the current context.

    This sets the context variable that will be returned by get_active().
    Typically used in application setup code or test fixtures.

    Args:
        reg: FeatureGraph to activate

    Example:
        ```py
        # In application setup
        my_graph = FeatureGraph()
        FeatureGraph.set_active(my_graph)

        # Now all operations use my_graph
        FeatureGraph.get_active()  # Returns my_graph
        ```
    """
    _active_graph.set(reg)

use

use() -> Iterator[Self]

Context manager to temporarily use this graph as active.

This is the recommended way to use custom registries, especially in tests. The graph is automatically restored when the context exits.

Yields:

  • FeatureGraph ( Self ) –

    This graph instance

Example
test_graph = FeatureGraph()

with test_graph.use():
    # All operations use test_graph
    class TestFeature(Feature, spec=...):
        pass

# Outside context, back to previous graph
Source code in src/metaxy/models/feature.py
@contextmanager
def use(self) -> Iterator[Self]:
    """Context manager to temporarily use this graph as active.

    This is the recommended way to use custom registries, especially in tests.
    The graph is automatically restored when the context exits.

    Yields:
        FeatureGraph: This graph instance

    Example:
        ```py
        test_graph = FeatureGraph()

        with test_graph.use():
            # All operations use test_graph
            class TestFeature(Feature, spec=...):
                pass

        # Outside context, back to previous graph
        ```
    """
    token = _active_graph.set(self)
    try:
        yield self
    finally:
        _active_graph.reset(token)