Feature Graph¶
FeatureGraph is a global "God" object that holds all the features loaded by Metaxy via the feature discovery mechanism.
Users may interact with FeatureGraph when writing custom migrations, otherwise they are not exposed to it.
FeatureGraph
¶
Source code in src/metaxy/models/feature.py
Attributes¶
snapshot_version
property
¶
snapshot_version: str
Generate a snapshot version representing the current topology + versions of the feature graph
Functions¶
add_feature
¶
add_feature(feature: type[BaseFeature]) -> None
Add a feature to the graph.
Parameters:
-
feature(type[BaseFeature]) –Feature class to register
Raises:
-
ValueError–If a feature with the same key is already registered or if duplicate column names would result from renaming operations
Source code in src/metaxy/models/feature.py
def add_feature(self, feature: type["BaseFeature"]) -> None:
"""Add a feature to the graph.
Args:
feature: Feature class to register
Raises:
ValueError: If a feature with the same key is already registered
or if duplicate column names would result from renaming operations
"""
if feature.spec().key in self.features_by_key:
existing = self.features_by_key[feature.spec().key]
raise ValueError(
f"Feature with key {feature.spec().key.to_string()} already registered. "
f"Existing: {existing.__name__}, New: {feature.__name__}. "
f"Each feature key must be unique within a graph."
)
# Validate that there are no duplicate column names across dependencies after renaming
if feature.spec().deps:
self._validate_no_duplicate_columns(feature.spec())
self.features_by_key[feature.spec().key] = feature
self.feature_specs_by_key[feature.spec().key] = feature.spec()
remove_feature
¶
remove_feature(key: FeatureKey) -> None
Remove a feature from the graph.
Parameters:
-
key(FeatureKey) –Feature key to remove
Raises:
-
KeyError–If no feature with the given key is registered
Source code in src/metaxy/models/feature.py
def remove_feature(self, key: FeatureKey) -> None:
"""Remove a feature from the graph.
Args:
key: Feature key to remove
Raises:
KeyError: If no feature with the given key is registered
"""
if key not in self.features_by_key:
raise KeyError(
f"No feature with key {key.to_string()} found in graph. "
f"Available keys: {[k.to_string() for k in self.features_by_key.keys()]}"
)
del self.features_by_key[key]
del self.feature_specs_by_key[key]
get_feature_by_key
¶
get_feature_by_key(key: FeatureKey) -> type[BaseFeature]
Get a feature class by its key.
Parameters:
-
key(FeatureKey) –Feature key to look up
Returns:
-
type[BaseFeature]–Feature class
Raises:
-
KeyError–If no feature with the given key is registered
Example
Source code in src/metaxy/models/feature.py
def get_feature_by_key(self, key: FeatureKey) -> type["BaseFeature"]:
"""Get a feature class by its key.
Args:
key: Feature key to look up
Returns:
Feature class
Raises:
KeyError: If no feature with the given key is registered
Example:
```py
graph = FeatureGraph.get_active()
parent_key = FeatureKey(["examples", "parent"])
ParentFeature = graph.get_feature_by_key(parent_key)
```
"""
if key not in self.features_by_key:
raise KeyError(
f"No feature with key {key.to_string()} found in graph. "
f"Available keys: {[k.to_string() for k in self.features_by_key.keys()]}"
)
return self.features_by_key[key]
get_feature_version_by_field
¶
get_feature_version_by_field(key: FeatureKey) -> dict[str, str]
Computes the field provenance map for a feature.
Hash together field provenance entries with the feature code version.
Returns:
-
dict[str, str]–dict[str, str]: The provenance hash for each field in the feature plan. Keys are field names as strings.
Source code in src/metaxy/models/feature.py
def get_feature_version_by_field(self, key: FeatureKey) -> dict[str, str]:
"""Computes the field provenance map for a feature.
Hash together field provenance entries with the feature code version.
Returns:
dict[str, str]: The provenance hash for each field in the feature plan.
Keys are field names as strings.
"""
res = {}
plan = self.get_feature_plan(key)
for k, v in plan.feature.fields_by_key.items():
res[k.to_string()] = self.get_field_version(
FQFieldKey(field=k, feature=key)
)
return res
get_feature_version
¶
get_feature_version(key: FeatureKey) -> str
Computes the feature version as a single string
Source code in src/metaxy/models/feature.py
def get_feature_version(self, key: FeatureKey) -> str:
"""Computes the feature version as a single string"""
hasher = hashlib.sha256()
provenance_by_field = self.get_feature_version_by_field(key)
for field_key in sorted(provenance_by_field):
hasher.update(field_key.encode())
hasher.update(provenance_by_field[field_key].encode())
return truncate_hash(hasher.hexdigest())
get_downstream_features
¶
get_downstream_features(sources: list[FeatureKey]) -> list[FeatureKey]
Get all features downstream of sources, topologically sorted.
Performs a depth-first traversal of the dependency graph to find all features that transitively depend on any of the source features.
Parameters:
-
sources(list[FeatureKey]) –List of source feature keys
Returns:
-
list[FeatureKey]–List of downstream feature keys in topological order (dependencies first).
-
list[FeatureKey]–Does not include the source features themselves.
Example
Source code in src/metaxy/models/feature.py
def get_downstream_features(self, sources: list[FeatureKey]) -> list[FeatureKey]:
"""Get all features downstream of sources, topologically sorted.
Performs a depth-first traversal of the dependency graph to find all
features that transitively depend on any of the source features.
Args:
sources: List of source feature keys
Returns:
List of downstream feature keys in topological order (dependencies first).
Does not include the source features themselves.
Example:
```py
# DAG: A -> B -> D
# A -> C -> D
graph.get_downstream_features([FeatureKey(["A"])])
# [FeatureKey(["B"]), FeatureKey(["C"]), FeatureKey(["D"])]
```
"""
source_set = set(sources)
visited = set()
post_order = [] # Reverse topological order
def visit(key: FeatureKey):
"""DFS traversal."""
if key in visited:
return
visited.add(key)
# Find all features that depend on this one
for feature_key, feature_spec in self.feature_specs_by_key.items():
if feature_spec.deps:
for dep in feature_spec.deps:
if dep.feature == key:
# This feature depends on 'key', so visit it
visit(feature_key)
post_order.append(key)
# Visit all sources
for source in sources:
visit(source)
# Remove sources from result, reverse to get topological order
result = [k for k in reversed(post_order) if k not in source_set]
return result
to_snapshot
¶
Serialize graph to snapshot format.
Returns a dict mapping feature_key (string) to feature data dict, including the import path of the Feature class for reconstruction.
Returns:
-
dict[str, dict[str, Any]]–Dict of feature_key -> { feature_spec: dict, metaxy_feature_version: str, metaxy_feature_spec_version: str, metaxy_feature_tracking_version: str, feature_class_path: str, project: str
-
dict[str, dict[str, Any]]–}
Example
snapshot = graph.to_snapshot()
snapshot["video_processing"]["metaxy_feature_version"]
# 'abc12345'
snapshot["video_processing"]["metaxy_feature_spec_version"]
# 'def67890'
snapshot["video_processing"]["metaxy_feature_tracking_version"]
# 'xyz98765'
snapshot["video_processing"]["feature_class_path"]
# 'myapp.features.video.VideoProcessing'
snapshot["video_processing"]["project"]
# 'myapp'
Source code in src/metaxy/models/feature.py
def to_snapshot(self) -> dict[str, dict[str, Any]]:
"""Serialize graph to snapshot format.
Returns a dict mapping feature_key (string) to feature data dict,
including the import path of the Feature class for reconstruction.
Returns:
Dict of feature_key -> {
feature_spec: dict,
metaxy_feature_version: str,
metaxy_feature_spec_version: str,
metaxy_feature_tracking_version: str,
feature_class_path: str,
project: str
}
Example:
```py
snapshot = graph.to_snapshot()
snapshot["video_processing"]["metaxy_feature_version"]
# 'abc12345'
snapshot["video_processing"]["metaxy_feature_spec_version"]
# 'def67890'
snapshot["video_processing"]["metaxy_feature_tracking_version"]
# 'xyz98765'
snapshot["video_processing"]["feature_class_path"]
# 'myapp.features.video.VideoProcessing'
snapshot["video_processing"]["project"]
# 'myapp'
```
"""
snapshot = {}
for feature_key, feature_cls in self.features_by_key.items():
feature_key_str = feature_key.to_string()
feature_spec_dict = feature_cls.spec().model_dump(mode="json") # type: ignore[attr-defined]
feature_version = feature_cls.feature_version() # type: ignore[attr-defined]
feature_spec_version = feature_cls.spec().feature_spec_version # type: ignore[attr-defined]
feature_tracking_version = feature_cls.feature_tracking_version() # type: ignore[attr-defined]
project = feature_cls.project # type: ignore[attr-defined]
# Get class import path (module.ClassName)
class_path = f"{feature_cls.__module__}.{feature_cls.__name__}"
snapshot[feature_key_str] = {
"feature_spec": feature_spec_dict,
FEATURE_VERSION_COL: feature_version,
FEATURE_SPEC_VERSION_COL: feature_spec_version,
FEATURE_TRACKING_VERSION_COL: feature_tracking_version,
"feature_class_path": class_path,
"project": project,
}
return snapshot
from_snapshot
classmethod
¶
from_snapshot(snapshot_data: dict[str, dict[str, Any]], *, class_path_overrides: dict[str, str] | None = None, force_reload: bool = False) -> FeatureGraph
Reconstruct graph from snapshot by importing Feature classes.
Strictly requires Feature classes to exist at their recorded import paths. This ensures custom methods (like load_input) are available.
If a feature has been moved/renamed, use class_path_overrides to specify the new location.
Parameters:
-
snapshot_data(dict[str, dict[str, Any]]) –Dict of feature_key -> dict containing feature_spec (dict), feature_class_path (str), and other fields as returned by to_snapshot() or loaded from DB
-
class_path_overrides(dict[str, str] | None, default:None) –Optional dict mapping feature_key to new class path for features that have been moved/renamed
-
force_reload(bool, default:False) –If True, reload modules from disk to get current code state.
Returns:
-
FeatureGraph–New FeatureGraph with historical features
Raises:
-
ImportError–If feature class cannot be imported at recorded path
Example
Source code in src/metaxy/models/feature.py
@classmethod
def from_snapshot(
cls,
snapshot_data: dict[str, dict[str, Any]],
*,
class_path_overrides: dict[str, str] | None = None,
force_reload: bool = False,
) -> "FeatureGraph":
"""Reconstruct graph from snapshot by importing Feature classes.
Strictly requires Feature classes to exist at their recorded import paths.
This ensures custom methods (like load_input) are available.
If a feature has been moved/renamed, use class_path_overrides to specify
the new location.
Args:
snapshot_data: Dict of feature_key -> dict containing
feature_spec (dict), feature_class_path (str), and other fields
as returned by to_snapshot() or loaded from DB
class_path_overrides: Optional dict mapping feature_key to new class path
for features that have been moved/renamed
force_reload: If True, reload modules from disk to get current code state.
Returns:
New FeatureGraph with historical features
Raises:
ImportError: If feature class cannot be imported at recorded path
Example:
```py
# Load snapshot from metadata store
historical_graph = FeatureGraph.from_snapshot(snapshot_data)
# With override for moved feature
historical_graph = FeatureGraph.from_snapshot(
snapshot_data,
class_path_overrides={
"video_processing": "myapp.features_v2.VideoProcessing"
}
)
```
"""
import importlib
import sys
graph = cls()
class_path_overrides = class_path_overrides or {}
# If force_reload, collect all module paths first to remove ALL features
# from those modules before reloading (modules can have multiple features)
modules_to_reload = set()
if force_reload:
for feature_key_str, feature_data in snapshot_data.items():
class_path = class_path_overrides.get(
feature_key_str
) or feature_data.get("feature_class_path")
if class_path:
module_path, _ = class_path.rsplit(".", 1)
if module_path in sys.modules:
modules_to_reload.add(module_path)
# Use context manager to temporarily set the new graph as active
# This ensures imported Feature classes register to the new graph, not the current one
with graph.use():
for feature_key_str, feature_data in snapshot_data.items():
# Parse FeatureSpec for validation
feature_spec_dict = feature_data["feature_spec"]
FeatureSpec.model_validate(feature_spec_dict)
# Get class path (check overrides first)
if feature_key_str in class_path_overrides:
class_path = class_path_overrides[feature_key_str]
else:
class_path = feature_data.get("feature_class_path")
if not class_path:
raise ValueError(
f"Feature '{feature_key_str}' has no feature_class_path in snapshot. "
f"Cannot reconstruct historical graph."
)
# Import the class
try:
module_path, class_name = class_path.rsplit(".", 1)
# Force reload module from disk if requested
# This is critical for migration detection - when code changes,
# we need fresh imports to detect the changes
if force_reload and module_path in modules_to_reload:
# Before first reload of this module, remove ALL features from this module
# (a module can define multiple features)
if module_path in modules_to_reload:
# Find all features from this module in snapshot and remove them
for fk_str, fd in snapshot_data.items():
fcp = class_path_overrides.get(fk_str) or fd.get(
"feature_class_path"
)
if fcp and fcp.rsplit(".", 1)[0] == module_path:
fspec_dict = fd["feature_spec"]
fspec = FeatureSpec.model_validate(fspec_dict)
if fspec.key in graph.features_by_key:
graph.remove_feature(fspec.key)
# Mark module as processed so we don't remove features again
modules_to_reload.discard(module_path)
module = importlib.reload(sys.modules[module_path])
else:
module = __import__(module_path, fromlist=[class_name])
feature_cls = getattr(module, class_name)
except (ImportError, AttributeError) as e:
raise ImportError(
f"Cannot import Feature class '{class_path}' for feature graph reconstruction from snapshot. "
f"Feature '{feature_key_str}' is required to reconstruct the graph, but the class "
f"cannot be found at the recorded import path. "
) from e
# Validate the imported class matches the stored spec
if not hasattr(feature_cls, "spec"):
raise TypeError(
f"Imported class '{class_path}' is not a valid Feature class "
f"(missing 'spec' attribute)"
)
# Register the imported feature to this graph if not already present
# If the module was imported for the first time, the metaclass already registered it
# If the module was previously imported, we need to manually register it
if feature_cls.spec().key not in graph.features_by_key:
graph.add_feature(feature_cls)
return graph
get_active
classmethod
¶
get_active() -> FeatureGraph
Get the currently active graph.
Returns the graph from the context variable if set, otherwise returns the default global graph.
Returns:
-
FeatureGraph–Active FeatureGraph instance
Example
Source code in src/metaxy/models/feature.py
@classmethod
def get_active(cls) -> "FeatureGraph":
"""Get the currently active graph.
Returns the graph from the context variable if set, otherwise returns
the default global graph.
Returns:
Active FeatureGraph instance
Example:
```py
# Normal usage - returns default graph
reg = FeatureGraph.get_active()
# With custom graph in context
with my_graph.use():
reg = FeatureGraph.get_active() # Returns my_graph
```
"""
return _active_graph.get() or graph
set_active
classmethod
¶
set_active(reg: FeatureGraph) -> None
Set the active graph for the current context.
This sets the context variable that will be returned by get_active(). Typically used in application setup code or test fixtures.
Parameters:
-
reg(FeatureGraph) –FeatureGraph to activate
Example
Source code in src/metaxy/models/feature.py
@classmethod
def set_active(cls, reg: "FeatureGraph") -> None:
"""Set the active graph for the current context.
This sets the context variable that will be returned by get_active().
Typically used in application setup code or test fixtures.
Args:
reg: FeatureGraph to activate
Example:
```py
# In application setup
my_graph = FeatureGraph()
FeatureGraph.set_active(my_graph)
# Now all operations use my_graph
FeatureGraph.get_active() # Returns my_graph
```
"""
_active_graph.set(reg)
use
¶
use() -> Iterator[Self]
Context manager to temporarily use this graph as active.
This is the recommended way to use custom registries, especially in tests. The graph is automatically restored when the context exits.
Yields:
-
FeatureGraph(Self) –This graph instance
Example
Source code in src/metaxy/models/feature.py
@contextmanager
def use(self) -> Iterator[Self]:
"""Context manager to temporarily use this graph as active.
This is the recommended way to use custom registries, especially in tests.
The graph is automatically restored when the context exits.
Yields:
FeatureGraph: This graph instance
Example:
```py
test_graph = FeatureGraph()
with test_graph.use():
# All operations use test_graph
class TestFeature(Feature, spec=...):
pass
# Outside context, back to previous graph
```
"""
token = _active_graph.set(self)
try:
yield self
finally:
_active_graph.reset(token)