Source code for foundry.scope

"""Scope discovery from Pydantic config models.

A scope represents a level in the config tree at which
operations can run.  Scopes are declared *explicitly* on the
config schema with the :class:`Scoped` ``Annotated``-metadata
marker::

   class ProjectConfig(FoundryConfig):
       apps: Annotated[list[App], Scoped(name="app")] = Field(
           default_factory=list,
       )

Each such field becomes a scope, and every item in the list is one
scope instance the engine iterates over.  Unannotated
``list[BaseModel]`` fields are treated as ordinary data, not
scopes — so target authors can carry lists of nested models
without accidentally creating new scope levels.

Scopes form a tree rooted at :data:`PROJECT`.  A child scope's
instances are resolved from its parent scope instance by walking
:attr:`Scope.resolve_path` — a tuple of attribute names.  For a
directly-annotated field this is just ``(field_name,)``; for
fields that sit inside an intermediate non-list ``BaseModel``
wrapper (e.g. ``App.config.resources``) the path is the full
attribute walk, e.g. ``("config", "resources")``.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from pydantic import BaseModel

if TYPE_CHECKING:
    from collections.abc import Iterator

    from pydantic.fields import FieldInfo


@dataclass(frozen=True)
class Scoped:
    """``Annotated`` marker tagging a list field as a foundry scope.

    Attach to a ``list[SomeBaseModel]`` field on a
    :class:`~foundry.config.FoundryConfig` subclass to declare
    that the field defines a scope level.  Only marked fields
    produce scopes; unmarked lists are plain data.

    Attributes:
        name: The scope's name, e.g. ``"app"`` or ``"resource"``.
            Required — field names on configs are conventionally
            plural (``apps``, ``resources``), but a scope refers
            to one instance, so the name is spelled out
            explicitly rather than derived by a heuristic.

    """

    name: str


[docs] @dataclass(frozen=True) class Scope: """A named level in the config tree. Attributes: name: Human-readable scope name, e.g. ``"resource"``. config_key: The config field name that produced this scope, e.g. ``"resources"``. Empty string for the root (project) scope. parent: The parent scope, or ``None`` for the root. resolve_path: Dotted attribute path from ``parent``'s scope instance to this scope's list of items. Empty for the root scope. Defaults to ``(config_key,)`` for a direct child. """ name: str config_key: str parent: Scope | None = None resolve_path: tuple[str, ...] = field(default=()) def __post_init__(self) -> None: """Enforce that only the root ``"project"`` scope is parentless.""" if self.parent is None and self.name != "project": msg = ( f"Scope {self.name!r} has no parent; only the root " f"'project' scope may be parentless" ) raise ValueError(msg)
# The root scope — always present. PROJECT = Scope(name="project", config_key="") class ScopeTree(tuple[Scope, ...]): """Flat collection of scopes with convenience lookups. Subclassing :class:`tuple` gives callers the usual iteration/index/`len` ergonomics for free, while the two methods below handle the recurring "children of X" and "scope for id Y" patterns so the engine, store, and assembler don't each reinvent the search. Construct from :func:`discover_scopes` output:: tree = ScopeTree(discover_scopes(MyConfig)) """ __slots__ = () def children_of(self, parent: Scope) -> list[Scope]: """Return direct children of *parent*, in discovery order.""" return [scope for scope in self if scope.parent is parent] def scope_for(self, instance_id: str) -> Scope: """Return the :class:`Scope` an ``instance_id`` belongs to. Instance ids produced by the engine are dot-joined paths of the form ``"project.<config_key>.<index>..."``. Each ``config_key`` maps to exactly one child scope at the current level, so the scope is recovered by walking the tree from :data:`PROJECT` using segment pairs. Args: instance_id: A dot-path instance id (e.g. ``"project.apps.0.resources.2"``). Returns: The :class:`Scope` the id terminates at. Raises: ValueError: If the id doesn't start with ``"project"`` or references a ``config_key`` not present in this tree. """ segments = instance_id.split(".") if segments[0] != "project": msg = f"Instance id {instance_id!r} must start with 'project'" raise ValueError(msg) current = PROJECT for i in range(1, len(segments), 2): config_key = segments[i] try: current = next( scope for scope in self if scope.parent is current and scope.config_key == config_key ) except StopIteration as exc: msg = ( f"Instance id {instance_id!r} references config_key " f"{config_key!r}, which is not a child of " f"{current.name!r}" ) raise ValueError(msg) from exc return current
[docs] def discover_scopes( config_cls: type[BaseModel], ) -> ScopeTree: """Derive scopes from a Pydantic model's :class:`Scoped` markers. The top-level config is always the ``"project"`` scope. Each field declared ``Annotated[list[T], Scoped()]`` becomes a child scope of the current level; the item type ``T`` is then itself descended into to discover grandchild scopes. Non-list ``BaseModel`` fields (e.g. ``App.config``) are traversed transparently: their nested :class:`Scoped` fields become scopes rooted at the enclosing level, with ``resolve_path`` reflecting the full attribute walk. Each scoped item type is descended into at most once, so discovery always terminates. If the same type appears in multiple scoped lists only the first occurrence is descended — subsequent ones still produce their own scope but no grandchildren. Args: config_cls: The Pydantic model class to inspect. Returns: :class:`ScopeTree` containing every discovered scope, project first. """ return ScopeTree((PROJECT, *_discover(config_cls, PROJECT, (), set())))
def _discover( cls: type[BaseModel], parent: Scope, prefix: tuple[str, ...], seen: set[type[BaseModel]], ) -> Iterator[Scope]: """Walk *cls*'s fields and yield discovered scopes. Two descent modes live here: - ``Scoped`` list fields yield a scope and recurse into the item type with a fresh ``prefix`` and the new scope as ``parent`` (gated by *seen* to break cycles). - Wrapper (non-list ``BaseModel``) fields recurse transparently with an extended ``prefix`` and the same ``parent``, so scoped lists nested inside wrappers surface with a compound ``resolve_path`` but without creating extra scope levels. """ for name, info in cls.model_fields.items(): marker = next( ( potential_marker for potential_marker in info.metadata if isinstance(potential_marker, Scoped) ), None, ) if marker: # Scope boundary: emit the scope, then descend into the # list's item type with a fresh prefix and the new scope # as parent. Gated by ``seen`` so recursive types # terminate. item_cls = _extract_base_model_from_scoped(cls, name, info) child = Scope( name=marker.name, config_key=name, parent=parent, resolve_path=(*prefix, name), ) yield child if item_cls not in seen: seen |= {item_cls} yield from _discover(item_cls, child, (), seen) else: # Organizational wrapper (non-list BaseModel): no scope # here, but walk in with an extended prefix so scoped # lists nested inside surface with the full attribute # path in ``resolve_path``. ann = info.annotation if isinstance(ann, type) and issubclass(ann, BaseModel): yield from _discover(ann, parent, (*prefix, name), seen) def _extract_base_model_from_scoped( cls: type[BaseModel], name: str, info: FieldInfo, ) -> type[BaseModel]: """Return the item type ``T`` from a :class:`Scoped`-marked ``list[T]``. Raises :class:`TypeError` if the annotation is not ``list[BaseModel]`` — the marker only makes sense there. """ annotation = info.annotation if getattr(annotation, "__origin__", None) is list: args: tuple[type, ...] = getattr(annotation, "__args__", ()) item = args[0] if args else None if isinstance(item, type) and issubclass(item, BaseModel): return item msg = ( f"{cls.__name__}.{name} is annotated Scoped() but its " f"type is not list[BaseModel]: {annotation!r}" ) raise TypeError(msg)