"""Operation protocol, ``@operation`` decorator, and registry.
An operation is a unit of generation that declares its scope,
dependencies, and an ``Options`` model for configuration
validation. The :func:`operation` decorator captures that
metadata and adds the class to an :class:`OperationRegistry` —
the engine reads everything it needs from the registry at build
time, so individual classes don't need to carry their metadata.
Production code relies on the process-wide :data:`DEFAULT_REGISTRY`
populated by entry-point-discovered modules at import time. Tests
can pass ``registry=<isolated>`` to :func:`operation` and
:class:`~foundry.engine.Engine` to keep their ops separate.
"""
from __future__ import annotations
import importlib.metadata
from dataclasses import dataclass, field
from graphlib import CycleError, TopologicalSorter
from typing import Any, NamedTuple
from pydantic import BaseModel
ENTRY_POINT_GROUP = "foundry.operations"
# -------------------------------------------------------------------
# Metadata
# -------------------------------------------------------------------
# -------------------------------------------------------------------
# Default options
# -------------------------------------------------------------------
[docs]
class EmptyOptions(BaseModel):
"""Default options model for operations with no config."""
# -------------------------------------------------------------------
# Registry
# -------------------------------------------------------------------
class OperationEntry(NamedTuple):
"""Pre-resolved pair of metadata and operation class."""
meta: OperationMeta
cls: type
@dataclass
class OperationRegistry:
"""Collection of ``(meta, cls)`` entries with query helpers.
Populated by the :func:`operation` decorator at decoration
time. The engine reads entries from the registry to walk
scopes, group by scope, and topo-sort within a scope — it
never needs to look up metadata on individual classes.
Attributes:
entries: ``(meta, cls)`` pairs, in registration order.
"""
entries: list[OperationEntry] = field(default_factory=list)
def register(self, meta: OperationMeta, cls: type) -> None:
"""Append an :class:`OperationEntry` to :attr:`entries`."""
self.entries.append(OperationEntry(meta=meta, cls=cls))
def validate_scopes(self, known: set[str]) -> None:
"""Raise if any operation targets a scope outside *known*.
Args:
known: Set of scope names discovered from the config
model.
Raises:
ValueError: If an op's declared scope is not in
*known*.
"""
for entry in self.entries:
if entry.meta.scope not in known:
msg = (
f"Operation '{entry.meta.name}' targets "
f"scope '{entry.meta.scope}' which was not "
f"discovered from the config"
)
raise ValueError(msg)
def sorted_by_scope(self) -> dict[str, list[OperationEntry]]:
"""Group entries by scope and topo-sort each bucket.
Phase (pre vs post) is encoded on ``meta.after_children``
and split out at runtime by the engine; a single sorted
list per scope is enough. Scopes with no registered ops
are omitted — the engine uses ``dict.get(scope, [])``.
Returns:
Mapping from scope name to topo-sorted
``(meta, cls)`` entries.
"""
buckets: dict[str, list[OperationEntry]] = {}
for entry in self.entries:
buckets.setdefault(entry.meta.scope, []).append(entry)
return {name: _topo_sort(ops) for name, ops in buckets.items()}
#: Process-wide registry populated by :func:`operation` decorators
#: at import time. Production callers read from this after
#: triggering entry-point imports via :func:`discover_operations`.
DEFAULT_REGISTRY = OperationRegistry()
# -------------------------------------------------------------------
# Decorator
# -------------------------------------------------------------------
[docs]
def operation( # noqa: PLR0913
name: str,
*,
scope: str,
requires: list[str] | None = None,
after_children: bool = False,
dispatch_on: str | None = None,
registry: OperationRegistry = DEFAULT_REGISTRY,
) -> Any: # noqa: ANN401
"""Decorate a class as a kiln operation.
The decorated class must define:
- ``Options``: a :class:`pydantic.BaseModel` subclass
(defaults to :class:`EmptyOptions` if absent).
- ``build(self, ctx, options) -> list``: produces output
objects for the engine to collect.
Optionally it may define:
- ``when(self, ctx) -> bool``: when present and returning
``False``, the engine skips this operation for the
current build context. Use this for conditional
operations (e.g. auth, which only runs when the project
has auth configured).
Operations can also modify earlier operations' outputs by
inspecting :attr:`BuildContext.store` and mutating the
objects returned by :meth:`BuildStore.outputs_under` in
place. Combined with ``requires`` for ordering and ``when``
for activation, a single operation mechanism covers both
"produce" and "augment" roles.
Args:
name: Unique operation name.
scope: Scope name (e.g. ``"resource"``, ``"app"``,
``"project"``).
requires: Operation names that must run first.
after_children: When ``True`` (project scope only),
defer this operation until every child scope has
executed so ``build`` can walk child output in the
store. The engine rejects this flag at any other
scope.
dispatch_on: Attribute name on the scope instance to
compare against *name*. When set, the engine skips
the op unless ``getattr(ctx.instance, dispatch_on)
== name``. Designed for scopes whose instance is a
discriminated-union config (e.g. ``OperationConfig``
entries under a resource), where multiple ops share
one scope and each matches a single entry.
registry: Registry to register into. Defaults to the
process-wide :data:`DEFAULT_REGISTRY`; tests may
pass an isolated registry to keep their ops out of
the global namespace.
Returns:
Class decorator.
Example::
@operation("get", scope="resource")
class Get:
class Options(BaseModel):
fields: list[FieldSpec] | None = None
def build(self, ctx, options):
return [RouteHandler(...)]
"""
reqs = tuple(requires or [])
meta = OperationMeta(
name=name,
scope=scope,
requires=reqs,
after_children=after_children,
dispatch_on=dispatch_on,
)
def decorator(cls: type) -> type:
registry.register(meta, cls)
if not hasattr(cls, "Options"):
cls.Options = EmptyOptions
return cls
return decorator
def load_default_registry() -> OperationRegistry:
"""Return :data:`DEFAULT_REGISTRY`, loading entry-point ops first.
Any installed package can declare operations in its
``pyproject.toml``::
[project.entry-points."foundry.operations"]
my_op = "my_pkg.ops:MyOp"
Loading those modules fires their :func:`operation`
decorators, which populate :data:`DEFAULT_REGISTRY`. Used as
the default factory for :attr:`~foundry.engine.Engine.registry`
so ``Engine()`` just works; Python's import cache makes
repeat calls cheap.
"""
for entry_point in importlib.metadata.entry_points(group=ENTRY_POINT_GROUP):
entry_point.load()
return DEFAULT_REGISTRY
# -------------------------------------------------------------------
# Topological sort
# -------------------------------------------------------------------
def _topo_sort(entries: list[OperationEntry]) -> list[OperationEntry]:
"""Sort *entries* by dependency order.
Delegates to :class:`graphlib.TopologicalSorter` and breaks
ties alphabetically on operation name so output is
deterministic regardless of input order. Raises
:class:`ValueError` on cycles or missing dependencies.
"""
# Sorted by name so ties in the topo result are broken
# deterministically — TopologicalSorter.static_order yields
# siblings in insertion order.
ordered = sorted(entries, key=lambda entry: entry.meta.name)
graph: dict[str, set[str]] = {
entry.meta.name: set(entry.meta.requires) for entry in ordered
}
_validate_requires(graph)
by_name: dict[str, OperationEntry] = {
entry.meta.name: entry for entry in ordered
}
try:
return [
by_name[name] for name in TopologicalSorter(graph).static_order()
]
except CycleError as exc:
msg = "Cycle detected in operation dependencies"
raise ValueError(msg) from exc
def _validate_requires(graph: dict[str, set[str]]) -> None:
"""Raise if any declared dependency isn't a node in *graph*.
``requires`` references other ops by name and can only be
checked once the full set of ops is known — which is also
why this lives alongside topo-sort rather than at registration
time (decorator order is import order, not dependency order).
"""
for name, deps in graph.items():
for dependency in deps:
if dependency not in graph:
msg = (
f"Operation '{name}' requires '{dependency}', "
f"which is not registered"
)
raise ValueError(msg)