Files
lore-engine-poc-v3/lore_engine_poc/graph_backend.py
Lore Engine Dev 5a8bf67afb slice 6.2: GraphBackend Protocol + InMemoryGraph for Setting/Plane/EXISTS_IN
Extends the GraphBackend Protocol with 6 new methods (add_setting,
find_setting, add_plane, find_plane, planes_in_setting,
add_exists_in, entity_planes, setting_entities). InMemoryGraph
implements them with O(1) reverse lookups (planes_by_setting,
entities_by_setting, settings_by_entity). Neo4jGraph gains
NotImplementedError stubs so isinstance(neo4j, GraphBackend) keeps
passing until the slice 6 follow-up mirrors the Cypher.

EXISTS_IN is the timeless type-assertion per docs/17-planes.md;
time-bounded membership is the slice 6.5 reified :Relation work.

+8 tests (718 → 726). All green. No regressions.
2026-06-19 12:59:16 -04:00

949 lines
40 KiB
Python

"""GraphBackend Protocol + in-memory implementation (slice 5.1).
This module is the seam for slice 5's storage-strategy work
(docs/12-storage-strategy.md). The 36 MCP tools and the
consistency engine today read from an in-memory :class:`Graph`
dataclass held in :mod:`lore_engine_poc.tools`; that class
moves here as :class:`InMemoryGraph` and a Protocol
(:class:`GraphBackend`) defines the contract any backend
(in-memory or Neo4j) must satisfy.
Slice 5.3+ adds :class:`Neo4jGraph` in
:mod:`lore_engine_poc.neo4j_graph` implementing the same
Protocol; slice 5.7 wires ``LORE_GRAPH_BACKEND=neo4j`` through
the MCP entry scripts to select it at startup.
Why a Protocol and not an ABC: the existing
:mod:`lore_engine_poc.llm` module already uses PEP 544
``@runtime_checkable Protocol`` for its :class:`LLMProvider`
seam. Mirroring that pattern keeps the codebase internally
consistent and lets ``isinstance(g, GraphBackend)`` work
without forcing inheritance on the in-memory dataclass.
The in-memory implementation is **byte-identical** to the
slice-0/4/10 :class:`Graph` body that lived in
:mod:`lore_engine_poc.tools`; ``tools.Graph`` is now a
back-compat alias (``Graph = InMemoryGraph``) so the 559
existing tests keep passing without edits. Slice 5.2 migrates
the 40 direct-attribute reads (e.g.
``graph.edges_by_subject[name][rel]``) to method calls so
the dict shape can become private in a later slice.
"""
from __future__ import annotations
from dataclasses import dataclass, field, replace
from typing import TYPE_CHECKING, Optional, Protocol, runtime_checkable
from .parsers import LoreSource
if TYPE_CHECKING:
# ``Edge`` is defined in :mod:`lore_engine_poc.tools`, which
# imports ``InMemoryGraph`` from this module. Importing
# ``Edge`` at runtime would form a circular import; the
# ``TYPE_CHECKING`` guard keeps type hints correct without
# paying the import cost (or the cycle).
from .tools import Edge
# ``TemplateSpec`` lives in ``lore_engine_poc.templates.schema``,
# which in turn depends on the Protocol. Same circular-import
# guard.
from .templates.schema import TemplateSpec
# ``Setting`` and ``Plane`` live in :mod:`lore_engine_poc.setting`.
# They are pure dataclasses with no dependency on this module,
# so a real import would be safe — but the TYPE_CHECKING guard
# keeps the lazy-import style consistent with the rest of the
# module.
from .setting import Plane, Setting
# ---------------------------------------------------------------------------
# Protocol — the contract every backend must satisfy
# ---------------------------------------------------------------------------
@runtime_checkable
class GraphBackend(Protocol):
"""The contract the 36 MCP tools + consistency engine rely on.
Methods are kept narrow on purpose: anything tools do
today (read or write) must be expressible as a method
call. Slice 5.2 is the work that makes this true for
the read side (40 sites migrated from dict access to
method calls). Slice 5.5 is the work that makes this
true for the write side on the Neo4j backend.
"""
# -- read side -----------------------------------------------------------
def edges_for_subject(
self, subject: str, relation: Optional[str] = None
) -> list[Edge]:
"""All edges where ``subject == subject``. If ``relation``
is provided, only that relation. Insertion order."""
...
def edges_for_object(
self, object_: str, relation: Optional[str] = None
) -> list[Edge]:
"""All edges where ``object_`` is one of the two endpoints
(in-memory indexes both subject and object so reverse
traversals are O(1))."""
...
def find_edge_by_id(self, edge_id: str) -> Optional[Edge]:
"""O(1) id lookup. Returns ``None`` if the id is not in
the graph (e.g. the edge was deleted)."""
...
def by_name(self, name: str) -> Optional[str]:
"""Resolve a name to its canonical form (case-insensitive
+ alias fallback)."""
...
def entities_of_type(self, type_: str) -> set[str]:
"""All entity names tagged with ``type_``."""
...
def lore_source(self, path: str) -> Optional[LoreSource]:
"""The :class:`LoreSource` registered for ``path``, or
``None`` if no source file at that path has been
ingested."""
...
def all_names(self) -> set[str]:
"""Every entity name the graph knows about. The in-memory
implementation exposes ``self.names``; Neo4j needs a
roundtrip to deliver this."""
...
# -- write side ----------------------------------------------------------
def add(self, edge: Edge) -> None:
"""Insert a new edge. Indexes are updated atomically
(in the in-memory backend; via a single transaction
in the Neo4j backend)."""
...
def add_entity_of_type(self, name: str, type_: str) -> None:
"""Tag an entity into the type index without an edge."""
...
def add_lore_source(self, source: LoreSource) -> None:
"""Register a :class:`LoreSource` and add its name to
``names`` (so ``by_name`` can find the source file)."""
...
def replace_edge(self, old_id: str, new_edge: Edge) -> None:
"""The single write chokepoint for :func:`retcon` and
:func:`mark_verified`. Drops the edge with ``old_id``
from the indexes and inserts ``new_edge`` under the
same id. If the subject/relation/object of ``new_edge``
differ from the old, the indexes are re-pointed; if
they're identical, the new edge swaps into the same
list positions in place.
Raises :class:`KeyError` if ``old_id`` is not in the
graph.
"""
...
def remove_entity(self, name: str) -> int:
"""Drop ``name`` and every edge that touches it.
Returns the number of edges removed."""
...
def remove_entity_of_type(self, name: str, type_: str) -> None:
"""Drop ``name`` from the type index only. No edges
are touched."""
...
def rename_entity(self, old: str, new: str) -> int:
"""Rename ``old`` → ``new`` everywhere. The old name
is preserved as an alias of ``new``. Returns the
number of edges re-pointed."""
...
def register_alias(self, canonical: str, alias: str) -> None:
"""Add ``alias`` as an alternative name for ``canonical``.
``by_name(alias)`` then resolves to ``canonical``."""
...
def register_name(self, name: str) -> None:
"""Add ``name`` to the names set without an edge or a
type tag. Used by :func:`build_graph` for entities that
are mentioned in the codex but have no relations yet."""
# -- polymorphic Layer 2 (slice 5T) ----------------------------------
def add_domain_entity(self, entity: dict) -> None:
"""Add (or upsert) a :class:`DomainEntity` node.
``entity`` is a dict with at least ``id``, ``type``,
``name``, and ``setting_id``. ``properties`` is the
free-form, type-checked payload from the template.
The function also tags the entity into the
``DomainEntity`` type bucket so :func:`entities_of_type`
can find it.
"""
...
def add_relation_node(self, relation: dict) -> None:
"""Add a reified :class:`Relation` node connecting two
entities by id. ``relation`` carries ``from_id``,
``to_id``, ``type``, optional ``valid_from`` /
``valid_until`` / ``properties`` / ``sources`` /
``extraction_confidence`` / ``source_confidence``.
Distinct from :func:`add` (the v1 Edge writer) which
indexes by name, not by id — this one indexes by id
so the polymorphic query layer can resolve a template's
declared `to: person_xxx` to the right node.
"""
...
def add_type_template(self, spec: "TemplateSpec") -> None:
"""Add (or upsert) a :class:`TypeTemplate` node carrying
the parsed template spec. The ``TemplateRegistry`` (5T.4)
calls this on :func:`reload`."""
...
def find_type_template(self, template_id: str) -> Optional["TemplateSpec"]:
"""Look up a registered template by id. Returns ``None``
if no template with that id is loaded."""
...
def domain_entities_of_type(self, type_value: str) -> set[str]:
"""Every DomainEntity's name whose ``type`` field equals
``type_value``. Distinct from :func:`entities_of_type`
(which tags by the v1 entity labels like ``Person`` /
``Faction``) — this one tags by the polymorphic
``type_value`` field that the template defines.
"""
...
def find_relation_by_id(self, relation_id: str) -> Optional[dict]:
"""Look up a reified :class:`Relation` node by id."""
...
def relations_for(
self, endpoint_id: str, *, direction: str = "out"
) -> list[dict]:
"""All :class:`Relation` nodes touching ``endpoint_id``.
``direction`` is one of ``"out"`` (the endpoint is the
relation's from-side), ``"in"`` (the endpoint is the
to-side), or ``"both"`` (either side). The in-memory
implementation indexes both directions for O(1) lookup;
the Neo4j backend uses a single traversal in either
direction.
"""
...
def query_cypher(self, body: str, params: dict) -> list[dict]:
"""Run a (caller-validated, allowlist-safe) read-only
Cypher body against this backend and return the rows.
``body`` is the static template query (the registry has
already run it through the Cypher allowlist); ``params``
is the bound-parameter dict from the tool call. Each
row is a dict keyed by the variables the body RETURNs
(e.g. ``{"m": {...}}``).
The in-memory backend implements a *tiny* matcher that
covers the patterns the 4 example templates need;
anything more exotic is a documented follow-up. The
Neo4j backend passes body + params straight to the
driver.
"""
...
...
# -- slice 6.2: Setting + Plane + EXISTS_IN (Layer 1) ----------------
def add_setting(self, setting: "Setting") -> None:
"""Add (or upsert) a :class:`Setting` node.
``setting.id`` is the stable lookup key. The
:class:`Setting` dataclass is defined in
:mod:`lore_engine_poc.setting`; this Protocol method
takes the dataclass directly so the writer and the
reader share one shape.
"""
...
def add_plane(self, plane: "Plane") -> None:
"""Add (or upsert) a :class:`Plane` node.
``plane.id`` is the stable lookup key. ``plane.setting_id``
must reference an existing :class:`Setting`'s id, but the
in-memory backend does not enforce this at write time
(it can be checked at migration time, per slice 6.4).
"""
...
def find_setting(self, setting_id: str) -> Optional["Setting"]:
"""Look up a registered :class:`Setting` by id."""
...
def find_plane(self, plane_id: str) -> Optional["Plane"]:
"""Look up a registered :class:`Plane` by id."""
...
def planes_in_setting(self, setting_id: str) -> list["Plane"]:
"""All :class:`Plane` nodes whose ``setting_id`` matches,
in insertion order."""
...
def add_exists_in(self, *, entity_id: str, setting_id: str) -> None:
"""Record the timeless ``EXISTS_IN`` fact
``entity_id EXISTS_IN setting_id``.
Per ``docs/17-planes.md``, ``EXISTS_IN`` is the timeless
type-assertion that an entity belongs to a Setting. The
typed edge is added to ``EDGE_TYPES`` (slice 6.1); this
Protocol method captures the *fact* so reverse lookups
(``entity_planes``, ``setting_entities``) are O(1).
Idempotent: re-adding the same ``(entity_id, setting_id)``
pair is a no-op.
"""
...
def entity_planes(self, entity_id: str) -> list[str]:
"""All setting ids the given entity has an ``EXISTS_IN``
fact for, in insertion order."""
...
def setting_entities(self, setting_id: str) -> set[str]:
"""All entity ids that have an ``EXISTS_IN`` fact for the
given setting, as a set (membership is not ordered — use
:func:`entities_of_type` for a specific label)."""
...
# ---------------------------------------------------------------------------
# InMemoryGraph — the renamed slice-0/4/10 Graph dataclass
# ---------------------------------------------------------------------------
@dataclass
class InMemoryGraph:
"""In-memory graph: name -> {relation -> [Edge, ...]}.
Slice 4.0 added two reverse-direction indexes to make
reverse-traversal tools O(1):
* ``edges_by_object`` — name -> [Edge, ...] indexed by the
edge's *object*. Lets tools like ``ancestors_of(person)``
or ``members_of(faction)`` answer "who points at X?" in a
single lookup instead of a full subject-side scan.
* ``entities_by_type`` — type -> {name, ...} for the
slice-0 ``Entity.type`` strings (``"npc"``, ``"faction"``,
``"location"``, etc.). Populated by ``build_graph`` for
every entity in the input. Read tools that need
"all factions" / "all locations" / "all NPCs" consult this
index.
Both indexes are **additive** — graphs built the slice-0/1/2
way (without these fields) are still valid; tools that need
the new indexes fall back to a full scan when they're empty.
"""
edges_by_subject: dict[str, dict[str, list[Edge]]] = field(default_factory=dict)
names: set[str] = field(default_factory=set)
# Side index: every LoreSource (markdown file or YAML file) keyed
# by its full path. Populated by :func:`build_graph` whenever it
# sees an :class:`Entity` (markdown path) or a ``_LORESOURCE_NODE``
# marker triple (structured-YAML path). Slice 1.3 makes the
# LoreSource a first-class node per AC 1.9, 1.10.
lore_sources: dict[str, LoreSource] = field(default_factory=dict)
# Slice 4.0 reverse indexes — see class docstring.
edges_by_object: dict[str, list[Edge]] = field(default_factory=dict)
entities_by_type: dict[str, set[str]] = field(default_factory=dict)
# Slice 10.0 — alternative names for entities (set by
# ``set_alias``). A name → set[alias, ...] map; the alias
# resolves to the canonical name via :func:`resolve_alias`.
aliases: dict[str, set[str]] = field(default_factory=dict)
# Slice 10.2 — id → edge reverse index, populated by
# :func:`add`. Lets the retcon / mark_verified write tools
# find a specific edge by its stable id in O(1). The
# subject / object indexes are still authoritative for
# query-style lookups.
edges_by_id: dict[str, Edge] = field(default_factory=dict)
# Slice 5T.1 — polymorphic Layer 2 storage.
# Domain entities keyed by id; the value is the full entity
# dict. Names are added to ``names`` and to the
# ``DomainEntity`` type bucket so :func:`by_name` and
# :func:`entities_of_type` can find them through the v1
# surface.
domain_entities: dict[str, dict] = field(default_factory=dict)
# Reified :Relation nodes keyed by id; the value is the
# relation dict. Two endpoint indexes for O(1) traversal
# in both directions (mirrors edges_by_object for v1).
relation_nodes: dict[str, dict] = field(default_factory=dict)
relation_nodes_by_endpoint: dict[str, set[str]] = field(default_factory=dict)
# Polymorphic type bucket: ``type_value -> {name, ...}``
# populated by :func:`add_domain_entity`. Distinct from
# ``entities_by_type`` (which is the v1 label bucket).
domain_entities_by_type: dict[str, set[str]] = field(default_factory=dict)
# :TypeTemplate storage keyed by template id.
type_templates: dict[str, "TemplateSpec"] = field(default_factory=dict)
# Slice 6.2 — Setting + Plane + EXISTS_IN (Layer 1) storage.
# ``settings`` is keyed by Setting.id; ``planes`` is keyed
# by Plane.id; the ``planes_by_setting`` index is the
# reverse lookup that powers ``planes_in_setting(...)``.
# ``exists_in`` is a set of (entity_id, setting_id) tuples
# — the timeless type-assertion per docs/17-planes.md —
# plus two endpoint indexes (``entities_by_setting``,
# ``settings_by_entity``) so reverse lookups stay O(1).
settings: dict[str, "Setting"] = field(default_factory=dict)
planes: dict[str, "Plane"] = field(default_factory=dict)
planes_by_setting: dict[str, list[str]] = field(default_factory=dict)
exists_in: set[tuple[str, str]] = field(default_factory=set)
entities_by_setting: dict[str, set[str]] = field(default_factory=dict)
settings_by_entity: dict[str, list[str]] = field(default_factory=dict)
# -- read side -----------------------------------------------------------
def edges_for_subject(
self, subject: str, relation: Optional[str] = None
) -> list[Edge]:
"""All edges where ``subject == subject``. If ``relation``
is provided, only that relation. Insertion order."""
rel_map = self.edges_by_subject.get(subject, {})
if relation is None:
out: list[Edge] = []
for edges in rel_map.values():
out.extend(edges)
return out
return list(rel_map.get(relation, []))
def edges_for_object(
self, object_: str, relation: Optional[str] = None
) -> list[Edge]:
"""All edges where ``object_`` is one of the two endpoints
(in-memory indexes both subject and object so reverse
traversals are O(1))."""
edges = self.edges_by_object.get(object_, [])
if relation is None:
return list(edges)
return [e for e in edges if e.relation == relation]
def find_edge_by_id(self, edge_id: str) -> Optional[Edge]:
"""O(1) lookup by stable per-edge id. Returns ``None`` if
the id isn't in the graph (e.g. the edge was deleted)."""
return self.edges_by_id.get(edge_id)
def by_name(self, name: str) -> Optional[str]:
"""Resolve a name to a canonical form (case-insensitive).
Also follows aliases: if ``name`` matches an alias, the
canonical name is returned.
"""
if name in self.names:
return name
low = name.lower()
for n in self.names:
if n.lower() == low:
return n
aliased = self.resolve_alias(name)
if aliased is not None:
return aliased
return None
def entities_of_type(self, type_: str) -> set[str]:
"""All entity names tagged with ``type_``."""
return set(self.entities_by_type.get(type_, set()))
def all_entity_types(self) -> list[str]:
"""All type buckets the graph knows about. Useful for
migrations that need to iterate every type (e.g.
``update_entity`` relabelling)."""
return list(self.entities_by_type.keys())
def lore_source(self, path: str) -> Optional[LoreSource]:
return self.lore_sources.get(path)
def all_names(self) -> set[str]:
return set(self.names)
# -- write side ----------------------------------------------------------
def add(self, edge: Edge) -> None:
self.names.add(edge.subject)
self.names.add(edge.object)
self.edges_by_subject.setdefault(edge.subject, {}).setdefault(
edge.relation, []
).append(edge)
# Maintain the reverse-direction index. We index *every*
# edge by both endpoints so "who points at X" is O(1).
self.edges_by_object.setdefault(edge.object, []).append(edge)
self.edges_by_object.setdefault(edge.subject, []).append(edge)
# Id index. If two edges with the same id are added, the
# second one wins — this is defensive; the rest of the
# graph assumes edge ids are unique.
self.edges_by_id[edge.edge_id] = edge
def add_entity_of_type(self, name: str, type_: str) -> None:
"""Tag an entity into the type index without adding an edge.
Read tools that need to know "what type is X?" or "give me
all X's" populate this directly. ``build_graph`` calls
this for every :class:`Entity` it sees.
"""
self.names.add(name)
self.entities_by_type.setdefault(type_, set()).add(name)
def add_lore_source(self, source: LoreSource) -> None:
"""Register a :class:`LoreSource`. Slice 1.3 makes the
LoreSource a first-class node per AC 1.9, 1.10; the
source's ``name`` is added to ``names`` so ``by_name``
can find the source file too.
"""
self.lore_sources[source.path] = source
if source.name:
self.names.add(source.name)
def replace_edge(self, old_id: str, new_edge: Edge) -> None:
"""Single chokepoint for :func:`retcon` and
:func:`mark_verified`. Lifts the inlined index surgery
that used to live in ``write_tools.py:447-481, 532-541``
so any backend (in-memory or Neo4j) can implement the
same semantics.
Behaviour:
* If ``new_edge`` has the same ``subject / relation /
object`` as the old edge, swap the old reference out
for the new one in the subject and object lists, and
update the id index. The id stays the same.
* If the subject, relation, or object differs, drop
the old edge from the subject / object indexes and
add the new one under its new identity. The id still
stays the same.
* ``new_edge.edge_id`` is the id we keep; if it differs
from ``old_id``, the contract is violated and this
raises :class:`ValueError`.
Raises :class:`KeyError` if ``old_id`` is not in the
graph.
"""
old = self.edges_by_id.get(old_id)
if old is None:
raise KeyError(f"replace_edge: id {old_id!r} not in graph")
if new_edge.edge_id != old_id:
raise ValueError(
f"replace_edge: new_edge.edge_id={new_edge.edge_id!r} "
f"differs from old_id={old_id!r}; ids must match"
)
if (
old.subject == new_edge.subject
and old.relation == new_edge.relation
and old.object == new_edge.object
):
# In-place swap under the same identity.
for rel_map in self.edges_by_subject.values():
for edges in rel_map.values():
for i, e in enumerate(edges):
if e.edge_id == old_id:
edges[i] = new_edge
for endpoint, edges in self.edges_by_object.items():
for i, e in enumerate(edges):
if e.edge_id == old_id:
edges[i] = new_edge
self.edges_by_id[old_id] = new_edge
return
# Subject / relation / object differ — drop from the old
# indexes and re-add under the new identity.
self._remove_edge_from_indexes(old_id, old)
self.add(new_edge)
def _remove_edge_from_indexes(self, edge_id: str, edge: Edge) -> None:
"""Internal helper: drop ``edge`` (looked up by id) from
all four indexes. Used by :func:`replace_edge` and
:func:`remove_entity`."""
if edge.subject in self.edges_by_subject:
old_rels = self.edges_by_subject[edge.subject]
if edge.relation in old_rels:
old_list = old_rels[edge.relation]
old_list[:] = [e for e in old_list if e.edge_id != edge_id]
if not old_list:
del old_rels[edge.relation]
if not old_rels:
del self.edges_by_subject[edge.subject]
for endpoint in (edge.object, edge.subject):
if endpoint in self.edges_by_object:
obj_list = self.edges_by_object[endpoint]
obj_list[:] = [e for e in obj_list if e.edge_id != edge_id]
if not obj_list:
del self.edges_by_object[endpoint]
self.edges_by_id.pop(edge_id, None)
def remove_entity(self, name: str) -> int:
"""Remove ``name`` and every edge that touches it.
Cascades through ``edges_by_subject`` and ``edges_by_object``
so no dangling references remain. Returns the number of
edges removed. Used by the slice-10 ``delete_entity`` tool.
"""
# Collect the ids we're about to drop so we can update
# the ``edges_by_id`` index in one pass.
ids_to_drop = {
e.edge_id
for edges in self.edges_by_subject.values()
for edge_list in edges.values()
for e in edge_list
if e.subject == name or e.object == name
}
# Count first, then strip. We rebuild the indexes without
# ``name`` because Edge is immutable (dataclass) and the
# indexes are plain dicts.
removed = 0
for subject, rel_map in list(self.edges_by_subject.items()):
for rel, edges in list(rel_map.items()):
survivors = [e for e in edges if e.subject != name and e.object != name]
removed += len(edges) - len(survivors)
rel_map[rel] = survivors
if not survivors:
del rel_map[rel]
if not rel_map:
del self.edges_by_subject[subject]
for endpoint, edges in list(self.edges_by_object.items()):
survivors = [e for e in edges if e.subject != name and e.object != name]
self.edges_by_object[endpoint] = survivors
if not survivors:
del self.edges_by_object[endpoint]
for eid in ids_to_drop:
self.edges_by_id.pop(eid, None)
self.names.discard(name)
for type_, members in list(self.entities_by_type.items()):
members.discard(name)
if not members:
del self.entities_by_type[type_]
self.aliases.pop(name, None)
return removed
def remove_entity_of_type(self, name: str, type_: str) -> None:
"""Drop ``name`` from the type index only. No edges
are touched. Used by ``update_entity`` when a type
tag is being demoted without removing the entity."""
members = self.entities_by_type.get(type_)
if members is not None:
members.discard(name)
if not members:
self.entities_by_type.pop(type_, None)
def rename_entity(self, old: str, new: str) -> int:
"""Rename ``old`` → ``new`` in the names set, type index,
and every edge endpoint. Returns the number of edges
re-pointed. Used by the slice-10 ``update_entity`` tool."""
if old == new:
return 0
# Names + types
if old in self.names:
self.names.discard(old)
self.names.add(new)
for type_, members in self.entities_by_type.items():
if old in members:
members.discard(old)
members.add(new)
# Re-point edges. We rebuild edges_by_subject and
# edges_by_object because Edge objects themselves are
# immutable (dataclass) — replacing them is the cleanest
# path. For the POC's typical world size this is fine.
re_pointed = 0
# Subject index: keys are subjects; values are dicts of
# relation → list[Edge]. Some subjects == old.
if old in self.edges_by_subject:
sub_edges = self.edges_by_subject.pop(old)
self.edges_by_subject.setdefault(new, {})
for rel, edges in sub_edges.items():
for e in edges:
e_new = replace(e, subject=new)
self.edges_by_subject[new].setdefault(rel, []).append(e_new)
# Re-key the id index to the replaced edge so
# retcon / mark_verified can still find it.
self.edges_by_id[e.edge_id] = e_new
re_pointed += 1
# Object index: values are list[Edge]; the key is endpoint
# (object name or subject name, since we index both).
new_obj_index: dict[str, list[Edge]] = {}
for endpoint, edges in self.edges_by_object.items():
target = new if endpoint == old else endpoint
for e in edges:
if e.subject == old or e.object == old:
e_new = e
if e.subject == old:
e_new = replace(e_new, subject=new)
if e.object == old:
e_new = replace(e_new, object=new)
new_obj_index.setdefault(target, []).append(e_new)
# Same id index maintenance for the object-side
# rename path.
self.edges_by_id[e.edge_id] = e_new
else:
new_obj_index.setdefault(target, []).append(e)
# Merge: also rebuild subject index for object-side renames
# (edges where old was the object, not the subject).
self.edges_by_object = new_obj_index
# Re-derive edges_by_subject from edges_by_object for any
# edge where old was the *object*.
for endpoint, edges in new_obj_index.items():
for e in edges:
if e.subject == new:
self.edges_by_subject.setdefault(new, {}).setdefault(
e.relation, []
)
# Avoid duplicates from the subject-side rebuild.
if e not in self.edges_by_subject[new][e.relation]:
self.edges_by_subject[new][e.relation].append(e)
# Aliases: the rename preserves the old name as an alias
# pointing at the new canonical. This always happens —
# the world-builder may later call ``merge_entities``
# which relies on the alias being registered to find
# the canonical form via ``by_name``.
old_aliases = self.aliases.pop(old, set())
alias_set = self.aliases.setdefault(new, set())
if old_aliases:
alias_set.update(old_aliases)
alias_set.add(old)
return re_pointed
def register_alias(self, canonical: str, alias: str) -> None:
"""Add ``alias`` as an alternative name for ``canonical``.
The canonical name does not need to be in the names set
for the alias to register (callers may register an
alias before the canonical is materialized). However,
:func:`by_name` will not resolve via the alias unless
the canonical is in the names set — the alias is only
consulted as a fallback.
"""
if alias == canonical:
return
self.aliases.setdefault(canonical, set()).add(alias)
def register_name(self, name: str) -> None:
"""Add ``name`` to the names set without an edge or type
tag. Used by :func:`build_graph` for entities that are
mentioned in the codex but have no relations yet."""
self.names.add(name)
def resolve_alias(self, alias: str) -> Optional[str]:
"""If ``alias`` is registered as an alternative name for
some canonical entity, return the canonical name. Otherwise
return ``None``. Used by the slice-10 ``set_alias`` /
``update_entity`` read paths."""
low = alias.lower()
for canonical, alias_set in self.aliases.items():
if alias in alias_set or alias.lower() in {a.lower() for a in alias_set}:
return canonical
return None
# -- slice 5T.1: polymorphic Layer 2 --------------------------------
def add_domain_entity(self, entity: dict) -> None:
"""Add (or upsert) a :class:`DomainEntity` node.
Required keys: ``id``, ``type``, ``name``, ``setting_id``.
``properties`` is a free-form dict; ``summary`` is a
text field for embedding. ``sources`` is a list of
source paths that produced the entity.
"""
eid = entity.get("id")
name = entity.get("name")
type_value = entity.get("type")
if not eid or not name or not type_value:
raise ValueError(
f"add_domain_entity: id/type/name are required, got {entity!r}"
)
# If the entity already exists under a different id (a
# write_tools caller may have used a slug-id as the
# entity id), we still upsert in place.
self.domain_entities[eid] = entity
self.names.add(name)
# Tag the v1 DomainEntity label bucket so
# entities_of_type('DomainEntity') finds it (the LLM
# might use that for "give me all polymorphic nodes").
self.entities_by_type.setdefault("DomainEntity", set()).add(name)
# And the polymorphic type bucket keyed on the
# template-defined ``type`` field.
self.domain_entities_by_type.setdefault(type_value, set()).add(name)
def add_relation_node(self, relation: dict) -> None:
"""Add a reified :class:`Relation` node.
Required keys: ``id``, ``from_id``, ``to_id``, ``type``.
The two endpoint names referenced by ``from_id`` /
``to_id`` are auto-registered so :func:`by_name` and
:func:`all_names` find them — this is how template
instances that reference bare slugs (``person_vex_silent``)
become first-class nodes the rest of the engine can
query.
"""
rid = relation.get("id")
from_id = relation.get("from_id")
to_id = relation.get("to_id")
rel_type = relation.get("type")
if not rid or not from_id or not to_id or not rel_type:
raise ValueError(
f"add_relation_node: id/from_id/to_id/type are required, got {relation!r}"
)
self.relation_nodes[rid] = relation
# Index both endpoints. The endpoint key is the *id*,
# not the name — the registry/template layer translates
# ``to: person_vex_silent`` to the matching id at
# ingest time. We also accept names (deferred) for
# forward-compat.
for endpoint in (from_id, to_id):
self.relation_nodes_by_endpoint.setdefault(endpoint, set()).add(rid)
# Auto-register the endpoint ids as names so
# by_name and all_names find them.
for endpoint_id in (from_id, to_id):
if endpoint_id not in self.names:
self.names.add(endpoint_id)
self.entities_by_type.setdefault("DomainEntity", set()).add(endpoint_id)
def add_type_template(self, spec: "TemplateSpec") -> None:
"""Add (or upsert) a :class:`TypeTemplate` node."""
if not spec.id:
raise ValueError("add_type_template: spec.id is required")
self.type_templates[spec.id] = spec
def find_type_template(self, template_id: str) -> Optional["TemplateSpec"]:
return self.type_templates.get(template_id)
def domain_entities_of_type(self, type_value: str) -> set[str]:
"""Every DomainEntity's name whose ``type`` field equals
``type_value``. Distinct from :func:`entities_of_type`
(which tags by the v1 entity labels)."""
return set(self.domain_entities_by_type.get(type_value, set()))
def find_relation_by_id(self, relation_id: str) -> Optional[dict]:
return self.relation_nodes.get(relation_id)
def relations_for(
self, endpoint_id: str, *, direction: str = "out"
) -> list[dict]:
"""All :class:`Relation` nodes touching ``endpoint_id``."""
if direction not in ("out", "in", "both"):
raise ValueError(
f"relations_for: direction must be 'out', 'in', or 'both', got {direction!r}"
)
ids = self.relation_nodes_by_endpoint.get(endpoint_id, set())
rows = [self.relation_nodes[rid] for rid in ids]
if direction == "both":
return rows
if direction == "out":
return [r for r in rows if r["from_id"] == endpoint_id]
# direction == "in"
return [r for r in rows if r["to_id"] == endpoint_id]
# -- slice 6.2: Setting + Plane + EXISTS_IN (Layer 1) --------------
def add_setting(self, setting: "Setting") -> None:
"""Add (or upsert) a :class:`Setting`. Idempotent on id.
Per ``docs/17-planes.md``, the Setting's id is the
stable lookup key — display names / kinds are
non-contract metadata and may be revised without
re-keying the graph.
"""
self.settings[setting.id] = setting
def find_setting(self, setting_id: str) -> Optional["Setting"]:
"""Look up a registered :class:`Setting` by id."""
return self.settings.get(setting_id)
def add_plane(self, plane: "Plane") -> None:
"""Add (or upsert) a :class:`Plane`. Idempotent on id.
Maintains the ``planes_by_setting`` index in
insertion order so :func:`planes_in_setting` is O(1).
Re-adding an existing id is a no-op for the index —
the original setting_id and insertion-order position
are preserved. (Moving a plane between settings is a
slice 6.4 migration concern, not a write-path concern.)
"""
if plane.id in self.planes:
# Preserve original setting_id + index position.
return
self.planes[plane.id] = plane
self.planes_by_setting.setdefault(plane.setting_id, []).append(plane.id)
def find_plane(self, plane_id: str) -> Optional["Plane"]:
"""Look up a registered :class:`Plane` by id."""
return self.planes.get(plane_id)
def planes_in_setting(self, setting_id: str) -> list["Plane"]:
"""All :class:`Plane` nodes whose ``setting_id`` matches,
in insertion order. Empty list if the setting has no
planes or doesn't exist."""
return [
self.planes[pid]
for pid in self.planes_by_setting.get(setting_id, [])
]
def add_exists_in(self, *, entity_id: str, setting_id: str) -> None:
"""Record the timeless ``EXISTS_IN`` fact. Idempotent.
The :class:`Setting` is *not* required to exist when
``add_exists_in`` is called — the in-memory backend
stores the fact and lets the slice 6.4 migration's
backfill materialise missing Setting nodes. This
matches the design's separation of concerns: the
``EXISTS_IN`` edge is a fact about the entity, the
Setting node is a structural placeholder.
"""
key = (entity_id, setting_id)
if key in self.exists_in:
return # idempotent
self.exists_in.add(key)
self.entities_by_setting.setdefault(setting_id, set()).add(entity_id)
self.settings_by_entity.setdefault(entity_id, []).append(setting_id)
def entity_planes(self, entity_id: str) -> list[str]:
"""All setting ids the given entity has an ``EXISTS_IN``
fact for, in insertion order. Empty list if the entity
has no memberships."""
return list(self.settings_by_entity.get(entity_id, []))
def setting_entities(self, setting_id: str) -> set[str]:
"""All entity ids that have an ``EXISTS_IN`` fact for the
given setting, as a set. Empty set if the setting has
no memberships."""
return set(self.entities_by_setting.get(setting_id, set()))
def query_cypher(self, body: str, params: dict) -> list[dict]:
"""Run a (caller-validated) read-only Cypher body against
the in-memory graph and return the rows.
This is *not* a full Cypher engine. It supports the
patterns the 4 example templates need:
* ``MATCH (n:DomainEntity {type: 'X', k: $p}) RETURN n``
* ``OPTIONAL MATCH (n)-[r]->(m)`` chained clauses
* ``WHERE n.k = $p`` filters
* ``RETURN n`` or ``RETURN m``
* ``ORDER BY n.k [ASC|DESC]``
* ``LIMIT N`` and ``SKIP N``
The allowlist (slice 5T.3) is the *only* safety
boundary — by the time we get here, the body has
already been validated. The matcher's job is to
execute the body faithfully against the in-memory
state, not to enforce the allowlist again.
Anything more exotic raises :class:`NotImplementedError`
with a clear message; expanding the matcher is a
documented follow-up.
"""
# Lazy import to keep the matcher code co-located with
# the cypher_allowlist module (which lives in the
# templates subpackage and imports back into
# graph_backend via the Protocol type hint). A
# top-level import here would form a cycle.
from .templates.cypher_runtime import run_in_memory
return run_in_memory(body, params, self)