Extends the GraphBackend Protocol with 6 new methods (add_setting, find_setting, add_plane, find_plane, planes_in_setting, add_exists_in, entity_planes, setting_entities). InMemoryGraph implements them with O(1) reverse lookups (planes_by_setting, entities_by_setting, settings_by_entity). Neo4jGraph gains NotImplementedError stubs so isinstance(neo4j, GraphBackend) keeps passing until the slice 6 follow-up mirrors the Cypher. EXISTS_IN is the timeless type-assertion per docs/17-planes.md; time-bounded membership is the slice 6.5 reified :Relation work. +8 tests (718 → 726). All green. No regressions.
949 lines
40 KiB
Python
949 lines
40 KiB
Python
"""GraphBackend Protocol + in-memory implementation (slice 5.1).
|
|
|
|
This module is the seam for slice 5's storage-strategy work
|
|
(docs/12-storage-strategy.md). The 36 MCP tools and the
|
|
consistency engine today read from an in-memory :class:`Graph`
|
|
dataclass held in :mod:`lore_engine_poc.tools`; that class
|
|
moves here as :class:`InMemoryGraph` and a Protocol
|
|
(:class:`GraphBackend`) defines the contract any backend
|
|
(in-memory or Neo4j) must satisfy.
|
|
|
|
Slice 5.3+ adds :class:`Neo4jGraph` in
|
|
:mod:`lore_engine_poc.neo4j_graph` implementing the same
|
|
Protocol; slice 5.7 wires ``LORE_GRAPH_BACKEND=neo4j`` through
|
|
the MCP entry scripts to select it at startup.
|
|
|
|
Why a Protocol and not an ABC: the existing
|
|
:mod:`lore_engine_poc.llm` module already uses PEP 544
|
|
``@runtime_checkable Protocol`` for its :class:`LLMProvider`
|
|
seam. Mirroring that pattern keeps the codebase internally
|
|
consistent and lets ``isinstance(g, GraphBackend)`` work
|
|
without forcing inheritance on the in-memory dataclass.
|
|
|
|
The in-memory implementation is **byte-identical** to the
|
|
slice-0/4/10 :class:`Graph` body that lived in
|
|
:mod:`lore_engine_poc.tools`; ``tools.Graph`` is now a
|
|
back-compat alias (``Graph = InMemoryGraph``) so the 559
|
|
existing tests keep passing without edits. Slice 5.2 migrates
|
|
the 40 direct-attribute reads (e.g.
|
|
``graph.edges_by_subject[name][rel]``) to method calls so
|
|
the dict shape can become private in a later slice.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field, replace
|
|
from typing import TYPE_CHECKING, Optional, Protocol, runtime_checkable
|
|
|
|
from .parsers import LoreSource
|
|
|
|
if TYPE_CHECKING:
|
|
# ``Edge`` is defined in :mod:`lore_engine_poc.tools`, which
|
|
# imports ``InMemoryGraph`` from this module. Importing
|
|
# ``Edge`` at runtime would form a circular import; the
|
|
# ``TYPE_CHECKING`` guard keeps type hints correct without
|
|
# paying the import cost (or the cycle).
|
|
from .tools import Edge
|
|
# ``TemplateSpec`` lives in ``lore_engine_poc.templates.schema``,
|
|
# which in turn depends on the Protocol. Same circular-import
|
|
# guard.
|
|
from .templates.schema import TemplateSpec
|
|
# ``Setting`` and ``Plane`` live in :mod:`lore_engine_poc.setting`.
|
|
# They are pure dataclasses with no dependency on this module,
|
|
# so a real import would be safe — but the TYPE_CHECKING guard
|
|
# keeps the lazy-import style consistent with the rest of the
|
|
# module.
|
|
from .setting import Plane, Setting
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Protocol — the contract every backend must satisfy
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@runtime_checkable
|
|
class GraphBackend(Protocol):
|
|
"""The contract the 36 MCP tools + consistency engine rely on.
|
|
|
|
Methods are kept narrow on purpose: anything tools do
|
|
today (read or write) must be expressible as a method
|
|
call. Slice 5.2 is the work that makes this true for
|
|
the read side (40 sites migrated from dict access to
|
|
method calls). Slice 5.5 is the work that makes this
|
|
true for the write side on the Neo4j backend.
|
|
"""
|
|
|
|
# -- read side -----------------------------------------------------------
|
|
|
|
def edges_for_subject(
|
|
self, subject: str, relation: Optional[str] = None
|
|
) -> list[Edge]:
|
|
"""All edges where ``subject == subject``. If ``relation``
|
|
is provided, only that relation. Insertion order."""
|
|
...
|
|
|
|
def edges_for_object(
|
|
self, object_: str, relation: Optional[str] = None
|
|
) -> list[Edge]:
|
|
"""All edges where ``object_`` is one of the two endpoints
|
|
(in-memory indexes both subject and object so reverse
|
|
traversals are O(1))."""
|
|
...
|
|
|
|
def find_edge_by_id(self, edge_id: str) -> Optional[Edge]:
|
|
"""O(1) id lookup. Returns ``None`` if the id is not in
|
|
the graph (e.g. the edge was deleted)."""
|
|
...
|
|
|
|
def by_name(self, name: str) -> Optional[str]:
|
|
"""Resolve a name to its canonical form (case-insensitive
|
|
+ alias fallback)."""
|
|
...
|
|
|
|
def entities_of_type(self, type_: str) -> set[str]:
|
|
"""All entity names tagged with ``type_``."""
|
|
...
|
|
|
|
def lore_source(self, path: str) -> Optional[LoreSource]:
|
|
"""The :class:`LoreSource` registered for ``path``, or
|
|
``None`` if no source file at that path has been
|
|
ingested."""
|
|
...
|
|
|
|
def all_names(self) -> set[str]:
|
|
"""Every entity name the graph knows about. The in-memory
|
|
implementation exposes ``self.names``; Neo4j needs a
|
|
roundtrip to deliver this."""
|
|
...
|
|
|
|
# -- write side ----------------------------------------------------------
|
|
|
|
def add(self, edge: Edge) -> None:
|
|
"""Insert a new edge. Indexes are updated atomically
|
|
(in the in-memory backend; via a single transaction
|
|
in the Neo4j backend)."""
|
|
...
|
|
|
|
def add_entity_of_type(self, name: str, type_: str) -> None:
|
|
"""Tag an entity into the type index without an edge."""
|
|
...
|
|
|
|
def add_lore_source(self, source: LoreSource) -> None:
|
|
"""Register a :class:`LoreSource` and add its name to
|
|
``names`` (so ``by_name`` can find the source file)."""
|
|
...
|
|
|
|
def replace_edge(self, old_id: str, new_edge: Edge) -> None:
|
|
"""The single write chokepoint for :func:`retcon` and
|
|
:func:`mark_verified`. Drops the edge with ``old_id``
|
|
from the indexes and inserts ``new_edge`` under the
|
|
same id. If the subject/relation/object of ``new_edge``
|
|
differ from the old, the indexes are re-pointed; if
|
|
they're identical, the new edge swaps into the same
|
|
list positions in place.
|
|
|
|
Raises :class:`KeyError` if ``old_id`` is not in the
|
|
graph.
|
|
"""
|
|
...
|
|
|
|
def remove_entity(self, name: str) -> int:
|
|
"""Drop ``name`` and every edge that touches it.
|
|
Returns the number of edges removed."""
|
|
...
|
|
|
|
def remove_entity_of_type(self, name: str, type_: str) -> None:
|
|
"""Drop ``name`` from the type index only. No edges
|
|
are touched."""
|
|
...
|
|
|
|
def rename_entity(self, old: str, new: str) -> int:
|
|
"""Rename ``old`` → ``new`` everywhere. The old name
|
|
is preserved as an alias of ``new``. Returns the
|
|
number of edges re-pointed."""
|
|
...
|
|
|
|
def register_alias(self, canonical: str, alias: str) -> None:
|
|
"""Add ``alias`` as an alternative name for ``canonical``.
|
|
``by_name(alias)`` then resolves to ``canonical``."""
|
|
...
|
|
|
|
def register_name(self, name: str) -> None:
|
|
"""Add ``name`` to the names set without an edge or a
|
|
type tag. Used by :func:`build_graph` for entities that
|
|
are mentioned in the codex but have no relations yet."""
|
|
|
|
# -- polymorphic Layer 2 (slice 5T) ----------------------------------
|
|
|
|
def add_domain_entity(self, entity: dict) -> None:
|
|
"""Add (or upsert) a :class:`DomainEntity` node.
|
|
|
|
``entity`` is a dict with at least ``id``, ``type``,
|
|
``name``, and ``setting_id``. ``properties`` is the
|
|
free-form, type-checked payload from the template.
|
|
The function also tags the entity into the
|
|
``DomainEntity`` type bucket so :func:`entities_of_type`
|
|
can find it.
|
|
"""
|
|
...
|
|
|
|
def add_relation_node(self, relation: dict) -> None:
|
|
"""Add a reified :class:`Relation` node connecting two
|
|
entities by id. ``relation`` carries ``from_id``,
|
|
``to_id``, ``type``, optional ``valid_from`` /
|
|
``valid_until`` / ``properties`` / ``sources`` /
|
|
``extraction_confidence`` / ``source_confidence``.
|
|
Distinct from :func:`add` (the v1 Edge writer) which
|
|
indexes by name, not by id — this one indexes by id
|
|
so the polymorphic query layer can resolve a template's
|
|
declared `to: person_xxx` to the right node.
|
|
"""
|
|
...
|
|
|
|
def add_type_template(self, spec: "TemplateSpec") -> None:
|
|
"""Add (or upsert) a :class:`TypeTemplate` node carrying
|
|
the parsed template spec. The ``TemplateRegistry`` (5T.4)
|
|
calls this on :func:`reload`."""
|
|
...
|
|
|
|
def find_type_template(self, template_id: str) -> Optional["TemplateSpec"]:
|
|
"""Look up a registered template by id. Returns ``None``
|
|
if no template with that id is loaded."""
|
|
...
|
|
|
|
def domain_entities_of_type(self, type_value: str) -> set[str]:
|
|
"""Every DomainEntity's name whose ``type`` field equals
|
|
``type_value``. Distinct from :func:`entities_of_type`
|
|
(which tags by the v1 entity labels like ``Person`` /
|
|
``Faction``) — this one tags by the polymorphic
|
|
``type_value`` field that the template defines.
|
|
"""
|
|
...
|
|
|
|
def find_relation_by_id(self, relation_id: str) -> Optional[dict]:
|
|
"""Look up a reified :class:`Relation` node by id."""
|
|
...
|
|
|
|
def relations_for(
|
|
self, endpoint_id: str, *, direction: str = "out"
|
|
) -> list[dict]:
|
|
"""All :class:`Relation` nodes touching ``endpoint_id``.
|
|
|
|
``direction`` is one of ``"out"`` (the endpoint is the
|
|
relation's from-side), ``"in"`` (the endpoint is the
|
|
to-side), or ``"both"`` (either side). The in-memory
|
|
implementation indexes both directions for O(1) lookup;
|
|
the Neo4j backend uses a single traversal in either
|
|
direction.
|
|
"""
|
|
...
|
|
|
|
def query_cypher(self, body: str, params: dict) -> list[dict]:
|
|
"""Run a (caller-validated, allowlist-safe) read-only
|
|
Cypher body against this backend and return the rows.
|
|
|
|
``body`` is the static template query (the registry has
|
|
already run it through the Cypher allowlist); ``params``
|
|
is the bound-parameter dict from the tool call. Each
|
|
row is a dict keyed by the variables the body RETURNs
|
|
(e.g. ``{"m": {...}}``).
|
|
|
|
The in-memory backend implements a *tiny* matcher that
|
|
covers the patterns the 4 example templates need;
|
|
anything more exotic is a documented follow-up. The
|
|
Neo4j backend passes body + params straight to the
|
|
driver.
|
|
"""
|
|
...
|
|
...
|
|
|
|
# -- slice 6.2: Setting + Plane + EXISTS_IN (Layer 1) ----------------
|
|
|
|
def add_setting(self, setting: "Setting") -> None:
|
|
"""Add (or upsert) a :class:`Setting` node.
|
|
|
|
``setting.id`` is the stable lookup key. The
|
|
:class:`Setting` dataclass is defined in
|
|
:mod:`lore_engine_poc.setting`; this Protocol method
|
|
takes the dataclass directly so the writer and the
|
|
reader share one shape.
|
|
"""
|
|
...
|
|
|
|
def add_plane(self, plane: "Plane") -> None:
|
|
"""Add (or upsert) a :class:`Plane` node.
|
|
|
|
``plane.id`` is the stable lookup key. ``plane.setting_id``
|
|
must reference an existing :class:`Setting`'s id, but the
|
|
in-memory backend does not enforce this at write time
|
|
(it can be checked at migration time, per slice 6.4).
|
|
"""
|
|
...
|
|
|
|
def find_setting(self, setting_id: str) -> Optional["Setting"]:
|
|
"""Look up a registered :class:`Setting` by id."""
|
|
...
|
|
|
|
def find_plane(self, plane_id: str) -> Optional["Plane"]:
|
|
"""Look up a registered :class:`Plane` by id."""
|
|
...
|
|
|
|
def planes_in_setting(self, setting_id: str) -> list["Plane"]:
|
|
"""All :class:`Plane` nodes whose ``setting_id`` matches,
|
|
in insertion order."""
|
|
...
|
|
|
|
def add_exists_in(self, *, entity_id: str, setting_id: str) -> None:
|
|
"""Record the timeless ``EXISTS_IN`` fact
|
|
``entity_id EXISTS_IN setting_id``.
|
|
|
|
Per ``docs/17-planes.md``, ``EXISTS_IN`` is the timeless
|
|
type-assertion that an entity belongs to a Setting. The
|
|
typed edge is added to ``EDGE_TYPES`` (slice 6.1); this
|
|
Protocol method captures the *fact* so reverse lookups
|
|
(``entity_planes``, ``setting_entities``) are O(1).
|
|
|
|
Idempotent: re-adding the same ``(entity_id, setting_id)``
|
|
pair is a no-op.
|
|
"""
|
|
...
|
|
|
|
def entity_planes(self, entity_id: str) -> list[str]:
|
|
"""All setting ids the given entity has an ``EXISTS_IN``
|
|
fact for, in insertion order."""
|
|
...
|
|
|
|
def setting_entities(self, setting_id: str) -> set[str]:
|
|
"""All entity ids that have an ``EXISTS_IN`` fact for the
|
|
given setting, as a set (membership is not ordered — use
|
|
:func:`entities_of_type` for a specific label)."""
|
|
...
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# InMemoryGraph — the renamed slice-0/4/10 Graph dataclass
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class InMemoryGraph:
|
|
"""In-memory graph: name -> {relation -> [Edge, ...]}.
|
|
|
|
Slice 4.0 added two reverse-direction indexes to make
|
|
reverse-traversal tools O(1):
|
|
|
|
* ``edges_by_object`` — name -> [Edge, ...] indexed by the
|
|
edge's *object*. Lets tools like ``ancestors_of(person)``
|
|
or ``members_of(faction)`` answer "who points at X?" in a
|
|
single lookup instead of a full subject-side scan.
|
|
* ``entities_by_type`` — type -> {name, ...} for the
|
|
slice-0 ``Entity.type`` strings (``"npc"``, ``"faction"``,
|
|
``"location"``, etc.). Populated by ``build_graph`` for
|
|
every entity in the input. Read tools that need
|
|
"all factions" / "all locations" / "all NPCs" consult this
|
|
index.
|
|
|
|
Both indexes are **additive** — graphs built the slice-0/1/2
|
|
way (without these fields) are still valid; tools that need
|
|
the new indexes fall back to a full scan when they're empty.
|
|
"""
|
|
|
|
edges_by_subject: dict[str, dict[str, list[Edge]]] = field(default_factory=dict)
|
|
names: set[str] = field(default_factory=set)
|
|
# Side index: every LoreSource (markdown file or YAML file) keyed
|
|
# by its full path. Populated by :func:`build_graph` whenever it
|
|
# sees an :class:`Entity` (markdown path) or a ``_LORESOURCE_NODE``
|
|
# marker triple (structured-YAML path). Slice 1.3 makes the
|
|
# LoreSource a first-class node per AC 1.9, 1.10.
|
|
lore_sources: dict[str, LoreSource] = field(default_factory=dict)
|
|
# Slice 4.0 reverse indexes — see class docstring.
|
|
edges_by_object: dict[str, list[Edge]] = field(default_factory=dict)
|
|
entities_by_type: dict[str, set[str]] = field(default_factory=dict)
|
|
# Slice 10.0 — alternative names for entities (set by
|
|
# ``set_alias``). A name → set[alias, ...] map; the alias
|
|
# resolves to the canonical name via :func:`resolve_alias`.
|
|
aliases: dict[str, set[str]] = field(default_factory=dict)
|
|
# Slice 10.2 — id → edge reverse index, populated by
|
|
# :func:`add`. Lets the retcon / mark_verified write tools
|
|
# find a specific edge by its stable id in O(1). The
|
|
# subject / object indexes are still authoritative for
|
|
# query-style lookups.
|
|
edges_by_id: dict[str, Edge] = field(default_factory=dict)
|
|
|
|
# Slice 5T.1 — polymorphic Layer 2 storage.
|
|
# Domain entities keyed by id; the value is the full entity
|
|
# dict. Names are added to ``names`` and to the
|
|
# ``DomainEntity`` type bucket so :func:`by_name` and
|
|
# :func:`entities_of_type` can find them through the v1
|
|
# surface.
|
|
domain_entities: dict[str, dict] = field(default_factory=dict)
|
|
# Reified :Relation nodes keyed by id; the value is the
|
|
# relation dict. Two endpoint indexes for O(1) traversal
|
|
# in both directions (mirrors edges_by_object for v1).
|
|
relation_nodes: dict[str, dict] = field(default_factory=dict)
|
|
relation_nodes_by_endpoint: dict[str, set[str]] = field(default_factory=dict)
|
|
# Polymorphic type bucket: ``type_value -> {name, ...}``
|
|
# populated by :func:`add_domain_entity`. Distinct from
|
|
# ``entities_by_type`` (which is the v1 label bucket).
|
|
domain_entities_by_type: dict[str, set[str]] = field(default_factory=dict)
|
|
# :TypeTemplate storage keyed by template id.
|
|
type_templates: dict[str, "TemplateSpec"] = field(default_factory=dict)
|
|
|
|
# Slice 6.2 — Setting + Plane + EXISTS_IN (Layer 1) storage.
|
|
# ``settings`` is keyed by Setting.id; ``planes`` is keyed
|
|
# by Plane.id; the ``planes_by_setting`` index is the
|
|
# reverse lookup that powers ``planes_in_setting(...)``.
|
|
# ``exists_in`` is a set of (entity_id, setting_id) tuples
|
|
# — the timeless type-assertion per docs/17-planes.md —
|
|
# plus two endpoint indexes (``entities_by_setting``,
|
|
# ``settings_by_entity``) so reverse lookups stay O(1).
|
|
settings: dict[str, "Setting"] = field(default_factory=dict)
|
|
planes: dict[str, "Plane"] = field(default_factory=dict)
|
|
planes_by_setting: dict[str, list[str]] = field(default_factory=dict)
|
|
exists_in: set[tuple[str, str]] = field(default_factory=set)
|
|
entities_by_setting: dict[str, set[str]] = field(default_factory=dict)
|
|
settings_by_entity: dict[str, list[str]] = field(default_factory=dict)
|
|
|
|
# -- read side -----------------------------------------------------------
|
|
|
|
def edges_for_subject(
|
|
self, subject: str, relation: Optional[str] = None
|
|
) -> list[Edge]:
|
|
"""All edges where ``subject == subject``. If ``relation``
|
|
is provided, only that relation. Insertion order."""
|
|
rel_map = self.edges_by_subject.get(subject, {})
|
|
if relation is None:
|
|
out: list[Edge] = []
|
|
for edges in rel_map.values():
|
|
out.extend(edges)
|
|
return out
|
|
return list(rel_map.get(relation, []))
|
|
|
|
def edges_for_object(
|
|
self, object_: str, relation: Optional[str] = None
|
|
) -> list[Edge]:
|
|
"""All edges where ``object_`` is one of the two endpoints
|
|
(in-memory indexes both subject and object so reverse
|
|
traversals are O(1))."""
|
|
edges = self.edges_by_object.get(object_, [])
|
|
if relation is None:
|
|
return list(edges)
|
|
return [e for e in edges if e.relation == relation]
|
|
|
|
def find_edge_by_id(self, edge_id: str) -> Optional[Edge]:
|
|
"""O(1) lookup by stable per-edge id. Returns ``None`` if
|
|
the id isn't in the graph (e.g. the edge was deleted)."""
|
|
return self.edges_by_id.get(edge_id)
|
|
|
|
def by_name(self, name: str) -> Optional[str]:
|
|
"""Resolve a name to a canonical form (case-insensitive).
|
|
|
|
Also follows aliases: if ``name`` matches an alias, the
|
|
canonical name is returned.
|
|
"""
|
|
if name in self.names:
|
|
return name
|
|
low = name.lower()
|
|
for n in self.names:
|
|
if n.lower() == low:
|
|
return n
|
|
aliased = self.resolve_alias(name)
|
|
if aliased is not None:
|
|
return aliased
|
|
return None
|
|
|
|
def entities_of_type(self, type_: str) -> set[str]:
|
|
"""All entity names tagged with ``type_``."""
|
|
return set(self.entities_by_type.get(type_, set()))
|
|
|
|
def all_entity_types(self) -> list[str]:
|
|
"""All type buckets the graph knows about. Useful for
|
|
migrations that need to iterate every type (e.g.
|
|
``update_entity`` relabelling)."""
|
|
return list(self.entities_by_type.keys())
|
|
|
|
def lore_source(self, path: str) -> Optional[LoreSource]:
|
|
return self.lore_sources.get(path)
|
|
|
|
def all_names(self) -> set[str]:
|
|
return set(self.names)
|
|
|
|
# -- write side ----------------------------------------------------------
|
|
|
|
def add(self, edge: Edge) -> None:
|
|
self.names.add(edge.subject)
|
|
self.names.add(edge.object)
|
|
self.edges_by_subject.setdefault(edge.subject, {}).setdefault(
|
|
edge.relation, []
|
|
).append(edge)
|
|
# Maintain the reverse-direction index. We index *every*
|
|
# edge by both endpoints so "who points at X" is O(1).
|
|
self.edges_by_object.setdefault(edge.object, []).append(edge)
|
|
self.edges_by_object.setdefault(edge.subject, []).append(edge)
|
|
# Id index. If two edges with the same id are added, the
|
|
# second one wins — this is defensive; the rest of the
|
|
# graph assumes edge ids are unique.
|
|
self.edges_by_id[edge.edge_id] = edge
|
|
|
|
def add_entity_of_type(self, name: str, type_: str) -> None:
|
|
"""Tag an entity into the type index without adding an edge.
|
|
|
|
Read tools that need to know "what type is X?" or "give me
|
|
all X's" populate this directly. ``build_graph`` calls
|
|
this for every :class:`Entity` it sees.
|
|
"""
|
|
self.names.add(name)
|
|
self.entities_by_type.setdefault(type_, set()).add(name)
|
|
|
|
def add_lore_source(self, source: LoreSource) -> None:
|
|
"""Register a :class:`LoreSource`. Slice 1.3 makes the
|
|
LoreSource a first-class node per AC 1.9, 1.10; the
|
|
source's ``name`` is added to ``names`` so ``by_name``
|
|
can find the source file too.
|
|
"""
|
|
self.lore_sources[source.path] = source
|
|
if source.name:
|
|
self.names.add(source.name)
|
|
|
|
def replace_edge(self, old_id: str, new_edge: Edge) -> None:
|
|
"""Single chokepoint for :func:`retcon` and
|
|
:func:`mark_verified`. Lifts the inlined index surgery
|
|
that used to live in ``write_tools.py:447-481, 532-541``
|
|
so any backend (in-memory or Neo4j) can implement the
|
|
same semantics.
|
|
|
|
Behaviour:
|
|
|
|
* If ``new_edge`` has the same ``subject / relation /
|
|
object`` as the old edge, swap the old reference out
|
|
for the new one in the subject and object lists, and
|
|
update the id index. The id stays the same.
|
|
* If the subject, relation, or object differs, drop
|
|
the old edge from the subject / object indexes and
|
|
add the new one under its new identity. The id still
|
|
stays the same.
|
|
* ``new_edge.edge_id`` is the id we keep; if it differs
|
|
from ``old_id``, the contract is violated and this
|
|
raises :class:`ValueError`.
|
|
|
|
Raises :class:`KeyError` if ``old_id`` is not in the
|
|
graph.
|
|
"""
|
|
old = self.edges_by_id.get(old_id)
|
|
if old is None:
|
|
raise KeyError(f"replace_edge: id {old_id!r} not in graph")
|
|
if new_edge.edge_id != old_id:
|
|
raise ValueError(
|
|
f"replace_edge: new_edge.edge_id={new_edge.edge_id!r} "
|
|
f"differs from old_id={old_id!r}; ids must match"
|
|
)
|
|
if (
|
|
old.subject == new_edge.subject
|
|
and old.relation == new_edge.relation
|
|
and old.object == new_edge.object
|
|
):
|
|
# In-place swap under the same identity.
|
|
for rel_map in self.edges_by_subject.values():
|
|
for edges in rel_map.values():
|
|
for i, e in enumerate(edges):
|
|
if e.edge_id == old_id:
|
|
edges[i] = new_edge
|
|
for endpoint, edges in self.edges_by_object.items():
|
|
for i, e in enumerate(edges):
|
|
if e.edge_id == old_id:
|
|
edges[i] = new_edge
|
|
self.edges_by_id[old_id] = new_edge
|
|
return
|
|
# Subject / relation / object differ — drop from the old
|
|
# indexes and re-add under the new identity.
|
|
self._remove_edge_from_indexes(old_id, old)
|
|
self.add(new_edge)
|
|
|
|
def _remove_edge_from_indexes(self, edge_id: str, edge: Edge) -> None:
|
|
"""Internal helper: drop ``edge`` (looked up by id) from
|
|
all four indexes. Used by :func:`replace_edge` and
|
|
:func:`remove_entity`."""
|
|
if edge.subject in self.edges_by_subject:
|
|
old_rels = self.edges_by_subject[edge.subject]
|
|
if edge.relation in old_rels:
|
|
old_list = old_rels[edge.relation]
|
|
old_list[:] = [e for e in old_list if e.edge_id != edge_id]
|
|
if not old_list:
|
|
del old_rels[edge.relation]
|
|
if not old_rels:
|
|
del self.edges_by_subject[edge.subject]
|
|
for endpoint in (edge.object, edge.subject):
|
|
if endpoint in self.edges_by_object:
|
|
obj_list = self.edges_by_object[endpoint]
|
|
obj_list[:] = [e for e in obj_list if e.edge_id != edge_id]
|
|
if not obj_list:
|
|
del self.edges_by_object[endpoint]
|
|
self.edges_by_id.pop(edge_id, None)
|
|
|
|
def remove_entity(self, name: str) -> int:
|
|
"""Remove ``name`` and every edge that touches it.
|
|
|
|
Cascades through ``edges_by_subject`` and ``edges_by_object``
|
|
so no dangling references remain. Returns the number of
|
|
edges removed. Used by the slice-10 ``delete_entity`` tool.
|
|
"""
|
|
# Collect the ids we're about to drop so we can update
|
|
# the ``edges_by_id`` index in one pass.
|
|
ids_to_drop = {
|
|
e.edge_id
|
|
for edges in self.edges_by_subject.values()
|
|
for edge_list in edges.values()
|
|
for e in edge_list
|
|
if e.subject == name or e.object == name
|
|
}
|
|
# Count first, then strip. We rebuild the indexes without
|
|
# ``name`` because Edge is immutable (dataclass) and the
|
|
# indexes are plain dicts.
|
|
removed = 0
|
|
for subject, rel_map in list(self.edges_by_subject.items()):
|
|
for rel, edges in list(rel_map.items()):
|
|
survivors = [e for e in edges if e.subject != name and e.object != name]
|
|
removed += len(edges) - len(survivors)
|
|
rel_map[rel] = survivors
|
|
if not survivors:
|
|
del rel_map[rel]
|
|
if not rel_map:
|
|
del self.edges_by_subject[subject]
|
|
for endpoint, edges in list(self.edges_by_object.items()):
|
|
survivors = [e for e in edges if e.subject != name and e.object != name]
|
|
self.edges_by_object[endpoint] = survivors
|
|
if not survivors:
|
|
del self.edges_by_object[endpoint]
|
|
for eid in ids_to_drop:
|
|
self.edges_by_id.pop(eid, None)
|
|
self.names.discard(name)
|
|
for type_, members in list(self.entities_by_type.items()):
|
|
members.discard(name)
|
|
if not members:
|
|
del self.entities_by_type[type_]
|
|
self.aliases.pop(name, None)
|
|
return removed
|
|
|
|
def remove_entity_of_type(self, name: str, type_: str) -> None:
|
|
"""Drop ``name`` from the type index only. No edges
|
|
are touched. Used by ``update_entity`` when a type
|
|
tag is being demoted without removing the entity."""
|
|
members = self.entities_by_type.get(type_)
|
|
if members is not None:
|
|
members.discard(name)
|
|
if not members:
|
|
self.entities_by_type.pop(type_, None)
|
|
|
|
def rename_entity(self, old: str, new: str) -> int:
|
|
"""Rename ``old`` → ``new`` in the names set, type index,
|
|
and every edge endpoint. Returns the number of edges
|
|
re-pointed. Used by the slice-10 ``update_entity`` tool."""
|
|
if old == new:
|
|
return 0
|
|
# Names + types
|
|
if old in self.names:
|
|
self.names.discard(old)
|
|
self.names.add(new)
|
|
for type_, members in self.entities_by_type.items():
|
|
if old in members:
|
|
members.discard(old)
|
|
members.add(new)
|
|
# Re-point edges. We rebuild edges_by_subject and
|
|
# edges_by_object because Edge objects themselves are
|
|
# immutable (dataclass) — replacing them is the cleanest
|
|
# path. For the POC's typical world size this is fine.
|
|
re_pointed = 0
|
|
# Subject index: keys are subjects; values are dicts of
|
|
# relation → list[Edge]. Some subjects == old.
|
|
if old in self.edges_by_subject:
|
|
sub_edges = self.edges_by_subject.pop(old)
|
|
self.edges_by_subject.setdefault(new, {})
|
|
for rel, edges in sub_edges.items():
|
|
for e in edges:
|
|
e_new = replace(e, subject=new)
|
|
self.edges_by_subject[new].setdefault(rel, []).append(e_new)
|
|
# Re-key the id index to the replaced edge so
|
|
# retcon / mark_verified can still find it.
|
|
self.edges_by_id[e.edge_id] = e_new
|
|
re_pointed += 1
|
|
# Object index: values are list[Edge]; the key is endpoint
|
|
# (object name or subject name, since we index both).
|
|
new_obj_index: dict[str, list[Edge]] = {}
|
|
for endpoint, edges in self.edges_by_object.items():
|
|
target = new if endpoint == old else endpoint
|
|
for e in edges:
|
|
if e.subject == old or e.object == old:
|
|
e_new = e
|
|
if e.subject == old:
|
|
e_new = replace(e_new, subject=new)
|
|
if e.object == old:
|
|
e_new = replace(e_new, object=new)
|
|
new_obj_index.setdefault(target, []).append(e_new)
|
|
# Same id index maintenance for the object-side
|
|
# rename path.
|
|
self.edges_by_id[e.edge_id] = e_new
|
|
else:
|
|
new_obj_index.setdefault(target, []).append(e)
|
|
# Merge: also rebuild subject index for object-side renames
|
|
# (edges where old was the object, not the subject).
|
|
self.edges_by_object = new_obj_index
|
|
# Re-derive edges_by_subject from edges_by_object for any
|
|
# edge where old was the *object*.
|
|
for endpoint, edges in new_obj_index.items():
|
|
for e in edges:
|
|
if e.subject == new:
|
|
self.edges_by_subject.setdefault(new, {}).setdefault(
|
|
e.relation, []
|
|
)
|
|
# Avoid duplicates from the subject-side rebuild.
|
|
if e not in self.edges_by_subject[new][e.relation]:
|
|
self.edges_by_subject[new][e.relation].append(e)
|
|
# Aliases: the rename preserves the old name as an alias
|
|
# pointing at the new canonical. This always happens —
|
|
# the world-builder may later call ``merge_entities``
|
|
# which relies on the alias being registered to find
|
|
# the canonical form via ``by_name``.
|
|
old_aliases = self.aliases.pop(old, set())
|
|
alias_set = self.aliases.setdefault(new, set())
|
|
if old_aliases:
|
|
alias_set.update(old_aliases)
|
|
alias_set.add(old)
|
|
return re_pointed
|
|
|
|
def register_alias(self, canonical: str, alias: str) -> None:
|
|
"""Add ``alias`` as an alternative name for ``canonical``.
|
|
|
|
The canonical name does not need to be in the names set
|
|
for the alias to register (callers may register an
|
|
alias before the canonical is materialized). However,
|
|
:func:`by_name` will not resolve via the alias unless
|
|
the canonical is in the names set — the alias is only
|
|
consulted as a fallback.
|
|
"""
|
|
if alias == canonical:
|
|
return
|
|
self.aliases.setdefault(canonical, set()).add(alias)
|
|
|
|
def register_name(self, name: str) -> None:
|
|
"""Add ``name`` to the names set without an edge or type
|
|
tag. Used by :func:`build_graph` for entities that are
|
|
mentioned in the codex but have no relations yet."""
|
|
self.names.add(name)
|
|
|
|
def resolve_alias(self, alias: str) -> Optional[str]:
|
|
"""If ``alias`` is registered as an alternative name for
|
|
some canonical entity, return the canonical name. Otherwise
|
|
return ``None``. Used by the slice-10 ``set_alias`` /
|
|
``update_entity`` read paths."""
|
|
low = alias.lower()
|
|
for canonical, alias_set in self.aliases.items():
|
|
if alias in alias_set or alias.lower() in {a.lower() for a in alias_set}:
|
|
return canonical
|
|
return None
|
|
|
|
# -- slice 5T.1: polymorphic Layer 2 --------------------------------
|
|
|
|
def add_domain_entity(self, entity: dict) -> None:
|
|
"""Add (or upsert) a :class:`DomainEntity` node.
|
|
|
|
Required keys: ``id``, ``type``, ``name``, ``setting_id``.
|
|
``properties`` is a free-form dict; ``summary`` is a
|
|
text field for embedding. ``sources`` is a list of
|
|
source paths that produced the entity.
|
|
"""
|
|
eid = entity.get("id")
|
|
name = entity.get("name")
|
|
type_value = entity.get("type")
|
|
if not eid or not name or not type_value:
|
|
raise ValueError(
|
|
f"add_domain_entity: id/type/name are required, got {entity!r}"
|
|
)
|
|
# If the entity already exists under a different id (a
|
|
# write_tools caller may have used a slug-id as the
|
|
# entity id), we still upsert in place.
|
|
self.domain_entities[eid] = entity
|
|
self.names.add(name)
|
|
# Tag the v1 DomainEntity label bucket so
|
|
# entities_of_type('DomainEntity') finds it (the LLM
|
|
# might use that for "give me all polymorphic nodes").
|
|
self.entities_by_type.setdefault("DomainEntity", set()).add(name)
|
|
# And the polymorphic type bucket keyed on the
|
|
# template-defined ``type`` field.
|
|
self.domain_entities_by_type.setdefault(type_value, set()).add(name)
|
|
|
|
def add_relation_node(self, relation: dict) -> None:
|
|
"""Add a reified :class:`Relation` node.
|
|
|
|
Required keys: ``id``, ``from_id``, ``to_id``, ``type``.
|
|
The two endpoint names referenced by ``from_id`` /
|
|
``to_id`` are auto-registered so :func:`by_name` and
|
|
:func:`all_names` find them — this is how template
|
|
instances that reference bare slugs (``person_vex_silent``)
|
|
become first-class nodes the rest of the engine can
|
|
query.
|
|
"""
|
|
rid = relation.get("id")
|
|
from_id = relation.get("from_id")
|
|
to_id = relation.get("to_id")
|
|
rel_type = relation.get("type")
|
|
if not rid or not from_id or not to_id or not rel_type:
|
|
raise ValueError(
|
|
f"add_relation_node: id/from_id/to_id/type are required, got {relation!r}"
|
|
)
|
|
self.relation_nodes[rid] = relation
|
|
# Index both endpoints. The endpoint key is the *id*,
|
|
# not the name — the registry/template layer translates
|
|
# ``to: person_vex_silent`` to the matching id at
|
|
# ingest time. We also accept names (deferred) for
|
|
# forward-compat.
|
|
for endpoint in (from_id, to_id):
|
|
self.relation_nodes_by_endpoint.setdefault(endpoint, set()).add(rid)
|
|
# Auto-register the endpoint ids as names so
|
|
# by_name and all_names find them.
|
|
for endpoint_id in (from_id, to_id):
|
|
if endpoint_id not in self.names:
|
|
self.names.add(endpoint_id)
|
|
self.entities_by_type.setdefault("DomainEntity", set()).add(endpoint_id)
|
|
|
|
def add_type_template(self, spec: "TemplateSpec") -> None:
|
|
"""Add (or upsert) a :class:`TypeTemplate` node."""
|
|
if not spec.id:
|
|
raise ValueError("add_type_template: spec.id is required")
|
|
self.type_templates[spec.id] = spec
|
|
|
|
def find_type_template(self, template_id: str) -> Optional["TemplateSpec"]:
|
|
return self.type_templates.get(template_id)
|
|
|
|
def domain_entities_of_type(self, type_value: str) -> set[str]:
|
|
"""Every DomainEntity's name whose ``type`` field equals
|
|
``type_value``. Distinct from :func:`entities_of_type`
|
|
(which tags by the v1 entity labels)."""
|
|
return set(self.domain_entities_by_type.get(type_value, set()))
|
|
|
|
def find_relation_by_id(self, relation_id: str) -> Optional[dict]:
|
|
return self.relation_nodes.get(relation_id)
|
|
|
|
def relations_for(
|
|
self, endpoint_id: str, *, direction: str = "out"
|
|
) -> list[dict]:
|
|
"""All :class:`Relation` nodes touching ``endpoint_id``."""
|
|
if direction not in ("out", "in", "both"):
|
|
raise ValueError(
|
|
f"relations_for: direction must be 'out', 'in', or 'both', got {direction!r}"
|
|
)
|
|
ids = self.relation_nodes_by_endpoint.get(endpoint_id, set())
|
|
rows = [self.relation_nodes[rid] for rid in ids]
|
|
if direction == "both":
|
|
return rows
|
|
if direction == "out":
|
|
return [r for r in rows if r["from_id"] == endpoint_id]
|
|
# direction == "in"
|
|
return [r for r in rows if r["to_id"] == endpoint_id]
|
|
|
|
# -- slice 6.2: Setting + Plane + EXISTS_IN (Layer 1) --------------
|
|
|
|
def add_setting(self, setting: "Setting") -> None:
|
|
"""Add (or upsert) a :class:`Setting`. Idempotent on id.
|
|
|
|
Per ``docs/17-planes.md``, the Setting's id is the
|
|
stable lookup key — display names / kinds are
|
|
non-contract metadata and may be revised without
|
|
re-keying the graph.
|
|
"""
|
|
self.settings[setting.id] = setting
|
|
|
|
def find_setting(self, setting_id: str) -> Optional["Setting"]:
|
|
"""Look up a registered :class:`Setting` by id."""
|
|
return self.settings.get(setting_id)
|
|
|
|
def add_plane(self, plane: "Plane") -> None:
|
|
"""Add (or upsert) a :class:`Plane`. Idempotent on id.
|
|
|
|
Maintains the ``planes_by_setting`` index in
|
|
insertion order so :func:`planes_in_setting` is O(1).
|
|
Re-adding an existing id is a no-op for the index —
|
|
the original setting_id and insertion-order position
|
|
are preserved. (Moving a plane between settings is a
|
|
slice 6.4 migration concern, not a write-path concern.)
|
|
"""
|
|
if plane.id in self.planes:
|
|
# Preserve original setting_id + index position.
|
|
return
|
|
self.planes[plane.id] = plane
|
|
self.planes_by_setting.setdefault(plane.setting_id, []).append(plane.id)
|
|
|
|
def find_plane(self, plane_id: str) -> Optional["Plane"]:
|
|
"""Look up a registered :class:`Plane` by id."""
|
|
return self.planes.get(plane_id)
|
|
|
|
def planes_in_setting(self, setting_id: str) -> list["Plane"]:
|
|
"""All :class:`Plane` nodes whose ``setting_id`` matches,
|
|
in insertion order. Empty list if the setting has no
|
|
planes or doesn't exist."""
|
|
return [
|
|
self.planes[pid]
|
|
for pid in self.planes_by_setting.get(setting_id, [])
|
|
]
|
|
|
|
def add_exists_in(self, *, entity_id: str, setting_id: str) -> None:
|
|
"""Record the timeless ``EXISTS_IN`` fact. Idempotent.
|
|
|
|
The :class:`Setting` is *not* required to exist when
|
|
``add_exists_in`` is called — the in-memory backend
|
|
stores the fact and lets the slice 6.4 migration's
|
|
backfill materialise missing Setting nodes. This
|
|
matches the design's separation of concerns: the
|
|
``EXISTS_IN`` edge is a fact about the entity, the
|
|
Setting node is a structural placeholder.
|
|
"""
|
|
key = (entity_id, setting_id)
|
|
if key in self.exists_in:
|
|
return # idempotent
|
|
self.exists_in.add(key)
|
|
self.entities_by_setting.setdefault(setting_id, set()).add(entity_id)
|
|
self.settings_by_entity.setdefault(entity_id, []).append(setting_id)
|
|
|
|
def entity_planes(self, entity_id: str) -> list[str]:
|
|
"""All setting ids the given entity has an ``EXISTS_IN``
|
|
fact for, in insertion order. Empty list if the entity
|
|
has no memberships."""
|
|
return list(self.settings_by_entity.get(entity_id, []))
|
|
|
|
def setting_entities(self, setting_id: str) -> set[str]:
|
|
"""All entity ids that have an ``EXISTS_IN`` fact for the
|
|
given setting, as a set. Empty set if the setting has
|
|
no memberships."""
|
|
return set(self.entities_by_setting.get(setting_id, set()))
|
|
|
|
def query_cypher(self, body: str, params: dict) -> list[dict]:
|
|
"""Run a (caller-validated) read-only Cypher body against
|
|
the in-memory graph and return the rows.
|
|
|
|
This is *not* a full Cypher engine. It supports the
|
|
patterns the 4 example templates need:
|
|
|
|
* ``MATCH (n:DomainEntity {type: 'X', k: $p}) RETURN n``
|
|
* ``OPTIONAL MATCH (n)-[r]->(m)`` chained clauses
|
|
* ``WHERE n.k = $p`` filters
|
|
* ``RETURN n`` or ``RETURN m``
|
|
* ``ORDER BY n.k [ASC|DESC]``
|
|
* ``LIMIT N`` and ``SKIP N``
|
|
|
|
The allowlist (slice 5T.3) is the *only* safety
|
|
boundary — by the time we get here, the body has
|
|
already been validated. The matcher's job is to
|
|
execute the body faithfully against the in-memory
|
|
state, not to enforce the allowlist again.
|
|
|
|
Anything more exotic raises :class:`NotImplementedError`
|
|
with a clear message; expanding the matcher is a
|
|
documented follow-up.
|
|
"""
|
|
# Lazy import to keep the matcher code co-located with
|
|
# the cypher_allowlist module (which lives in the
|
|
# templates subpackage and imports back into
|
|
# graph_backend via the Protocol type hint). A
|
|
# top-level import here would form a cycle.
|
|
from .templates.cypher_runtime import run_in_memory
|
|
|
|
return run_in_memory(body, params, self) |