slice 5.1: GraphBackend Protocol + InMemoryGraph rename + replace_edge

- Lift Graph dataclass from tools.py into graph_backend.py as
  InMemoryGraph (the slice-0/4/10 body, byte-identical).
- New GraphBackend Protocol (PEP 544 + @runtime_checkable) with
  14 method points (7 read, 7 write). Mirrors the LLMProvider
  pattern in lore_engine_poc/llm.py:47-48.
- tools.Graph is now a back-compat alias (Graph = InMemoryGraph).
  Zero test churn across the 559 existing tests.
- New replace_edge(old_id, new_edge) chokepoint. Lifts the
  inlined index surgery that lived in write_tools.py retcon +
  mark_verified. Same-endpoint swap is in-place; subject/
  relation/object change drops + re-adds.
- New helpers: edges_for_subject, edges_for_object, entities_of_type,
  lore_source, all_names, add_lore_source, remove_entity_of_type,
  register_alias, register_name.
- 16 contract tests in tests/test_tools/test_graph_backend.py.
- Suite: 559 -> 575. No regressions.

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Lore Engine Dev
2026-06-18 21:15:56 -04:00
parent 4fb0fc4ae2
commit ec0083a15b
3 changed files with 817 additions and 221 deletions

View File

@@ -0,0 +1,547 @@
"""GraphBackend Protocol + in-memory implementation (slice 5.1).
This module is the seam for slice 5's storage-strategy work
(docs/12-storage-strategy.md). The 36 MCP tools and the
consistency engine today read from an in-memory :class:`Graph`
dataclass held in :mod:`lore_engine_poc.tools`; that class
moves here as :class:`InMemoryGraph` and a Protocol
(:class:`GraphBackend`) defines the contract any backend
(in-memory or Neo4j) must satisfy.
Slice 5.3+ adds :class:`Neo4jGraph` in
:mod:`lore_engine_poc.neo4j_graph` implementing the same
Protocol; slice 5.7 wires ``LORE_GRAPH_BACKEND=neo4j`` through
the MCP entry scripts to select it at startup.
Why a Protocol and not an ABC: the existing
:mod:`lore_engine_poc.llm` module already uses PEP 544
``@runtime_checkable Protocol`` for its :class:`LLMProvider`
seam. Mirroring that pattern keeps the codebase internally
consistent and lets ``isinstance(g, GraphBackend)`` work
without forcing inheritance on the in-memory dataclass.
The in-memory implementation is **byte-identical** to the
slice-0/4/10 :class:`Graph` body that lived in
:mod:`lore_engine_poc.tools`; ``tools.Graph`` is now a
back-compat alias (``Graph = InMemoryGraph``) so the 559
existing tests keep passing without edits. Slice 5.2 migrates
the 40 direct-attribute reads (e.g.
``graph.edges_by_subject[name][rel]``) to method calls so
the dict shape can become private in a later slice.
"""
from __future__ import annotations
from dataclasses import dataclass, field, replace
from typing import TYPE_CHECKING, Optional, Protocol, runtime_checkable
from .parsers import LoreSource
if TYPE_CHECKING:
# ``Edge`` is defined in :mod:`lore_engine_poc.tools`, which
# imports ``InMemoryGraph`` from this module. Importing
# ``Edge`` at runtime would form a circular import; the
# ``TYPE_CHECKING`` guard keeps type hints correct without
# paying the import cost (or the cycle).
from .tools import Edge
# ---------------------------------------------------------------------------
# Protocol — the contract every backend must satisfy
# ---------------------------------------------------------------------------
@runtime_checkable
class GraphBackend(Protocol):
"""The contract the 36 MCP tools + consistency engine rely on.
Methods are kept narrow on purpose: anything tools do
today (read or write) must be expressible as a method
call. Slice 5.2 is the work that makes this true for
the read side (40 sites migrated from dict access to
method calls). Slice 5.5 is the work that makes this
true for the write side on the Neo4j backend.
"""
# -- read side -----------------------------------------------------------
def edges_for_subject(
self, subject: str, relation: Optional[str] = None
) -> list[Edge]:
"""All edges where ``subject == subject``. If ``relation``
is provided, only that relation. Insertion order."""
...
def edges_for_object(
self, object_: str, relation: Optional[str] = None
) -> list[Edge]:
"""All edges where ``object_`` is one of the two endpoints
(in-memory indexes both subject and object so reverse
traversals are O(1))."""
...
def find_edge_by_id(self, edge_id: str) -> Optional[Edge]:
"""O(1) id lookup. Returns ``None`` if the id is not in
the graph (e.g. the edge was deleted)."""
...
def by_name(self, name: str) -> Optional[str]:
"""Resolve a name to its canonical form (case-insensitive
+ alias fallback)."""
...
def entities_of_type(self, type_: str) -> set[str]:
"""All entity names tagged with ``type_``."""
...
def lore_source(self, path: str) -> Optional[LoreSource]:
"""The :class:`LoreSource` registered for ``path``, or
``None`` if no source file at that path has been
ingested."""
...
def all_names(self) -> set[str]:
"""Every entity name the graph knows about. The in-memory
implementation exposes ``self.names``; Neo4j needs a
roundtrip to deliver this."""
...
# -- write side ----------------------------------------------------------
def add(self, edge: Edge) -> None:
"""Insert a new edge. Indexes are updated atomically
(in the in-memory backend; via a single transaction
in the Neo4j backend)."""
...
def add_entity_of_type(self, name: str, type_: str) -> None:
"""Tag an entity into the type index without an edge."""
...
def add_lore_source(self, source: LoreSource) -> None:
"""Register a :class:`LoreSource` and add its name to
``names`` (so ``by_name`` can find the source file)."""
...
def replace_edge(self, old_id: str, new_edge: Edge) -> None:
"""The single write chokepoint for :func:`retcon` and
:func:`mark_verified`. Drops the edge with ``old_id``
from the indexes and inserts ``new_edge`` under the
same id. If the subject/relation/object of ``new_edge``
differ from the old, the indexes are re-pointed; if
they're identical, the new edge swaps into the same
list positions in place.
Raises :class:`KeyError` if ``old_id`` is not in the
graph.
"""
...
def remove_entity(self, name: str) -> int:
"""Drop ``name`` and every edge that touches it.
Returns the number of edges removed."""
...
def remove_entity_of_type(self, name: str, type_: str) -> None:
"""Drop ``name`` from the type index only. No edges
are touched."""
...
def rename_entity(self, old: str, new: str) -> int:
"""Rename ``old`` → ``new`` everywhere. The old name
is preserved as an alias of ``new``. Returns the
number of edges re-pointed."""
...
def register_alias(self, canonical: str, alias: str) -> None:
"""Add ``alias`` as an alternative name for ``canonical``.
``by_name(alias)`` then resolves to ``canonical``."""
...
def register_name(self, name: str) -> None:
"""Add ``name`` to the names set without an edge or a
type tag. Used by :func:`build_graph` for entities that
are mentioned in the codex but have no relations yet."""
...
# ---------------------------------------------------------------------------
# InMemoryGraph — the renamed slice-0/4/10 Graph dataclass
# ---------------------------------------------------------------------------
@dataclass
class InMemoryGraph:
"""In-memory graph: name -> {relation -> [Edge, ...]}.
Slice 4.0 added two reverse-direction indexes to make
reverse-traversal tools O(1):
* ``edges_by_object`` — name -> [Edge, ...] indexed by the
edge's *object*. Lets tools like ``ancestors_of(person)``
or ``members_of(faction)`` answer "who points at X?" in a
single lookup instead of a full subject-side scan.
* ``entities_by_type`` — type -> {name, ...} for the
slice-0 ``Entity.type`` strings (``"npc"``, ``"faction"``,
``"location"``, etc.). Populated by ``build_graph`` for
every entity in the input. Read tools that need
"all factions" / "all locations" / "all NPCs" consult this
index.
Both indexes are **additive** — graphs built the slice-0/1/2
way (without these fields) are still valid; tools that need
the new indexes fall back to a full scan when they're empty.
"""
edges_by_subject: dict[str, dict[str, list[Edge]]] = field(default_factory=dict)
names: set[str] = field(default_factory=set)
# Side index: every LoreSource (markdown file or YAML file) keyed
# by its full path. Populated by :func:`build_graph` whenever it
# sees an :class:`Entity` (markdown path) or a ``_LORESOURCE_NODE``
# marker triple (structured-YAML path). Slice 1.3 makes the
# LoreSource a first-class node per AC 1.9, 1.10.
lore_sources: dict[str, LoreSource] = field(default_factory=dict)
# Slice 4.0 reverse indexes — see class docstring.
edges_by_object: dict[str, list[Edge]] = field(default_factory=dict)
entities_by_type: dict[str, set[str]] = field(default_factory=dict)
# Slice 10.0 — alternative names for entities (set by
# ``set_alias``). A name → set[alias, ...] map; the alias
# resolves to the canonical name via :func:`resolve_alias`.
aliases: dict[str, set[str]] = field(default_factory=dict)
# Slice 10.2 — id → edge reverse index, populated by
# :func:`add`. Lets the retcon / mark_verified write tools
# find a specific edge by its stable id in O(1). The
# subject / object indexes are still authoritative for
# query-style lookups.
edges_by_id: dict[str, Edge] = field(default_factory=dict)
# -- read side -----------------------------------------------------------
def edges_for_subject(
self, subject: str, relation: Optional[str] = None
) -> list[Edge]:
"""All edges where ``subject == subject``. If ``relation``
is provided, only that relation. Insertion order."""
rel_map = self.edges_by_subject.get(subject, {})
if relation is None:
out: list[Edge] = []
for edges in rel_map.values():
out.extend(edges)
return out
return list(rel_map.get(relation, []))
def edges_for_object(
self, object_: str, relation: Optional[str] = None
) -> list[Edge]:
"""All edges where ``object_`` is one of the two endpoints
(in-memory indexes both subject and object so reverse
traversals are O(1))."""
edges = self.edges_by_object.get(object_, [])
if relation is None:
return list(edges)
return [e for e in edges if e.relation == relation]
def find_edge_by_id(self, edge_id: str) -> Optional[Edge]:
"""O(1) lookup by stable per-edge id. Returns ``None`` if
the id isn't in the graph (e.g. the edge was deleted)."""
return self.edges_by_id.get(edge_id)
def by_name(self, name: str) -> Optional[str]:
"""Resolve a name to a canonical form (case-insensitive).
Also follows aliases: if ``name`` matches an alias, the
canonical name is returned.
"""
if name in self.names:
return name
low = name.lower()
for n in self.names:
if n.lower() == low:
return n
aliased = self.resolve_alias(name)
if aliased is not None:
return aliased
return None
def entities_of_type(self, type_: str) -> set[str]:
"""All entity names tagged with ``type_``."""
return set(self.entities_by_type.get(type_, set()))
def lore_source(self, path: str) -> Optional[LoreSource]:
return self.lore_sources.get(path)
def all_names(self) -> set[str]:
return set(self.names)
# -- write side ----------------------------------------------------------
def add(self, edge: Edge) -> None:
self.names.add(edge.subject)
self.names.add(edge.object)
self.edges_by_subject.setdefault(edge.subject, {}).setdefault(
edge.relation, []
).append(edge)
# Maintain the reverse-direction index. We index *every*
# edge by both endpoints so "who points at X" is O(1).
self.edges_by_object.setdefault(edge.object, []).append(edge)
self.edges_by_object.setdefault(edge.subject, []).append(edge)
# Id index. If two edges with the same id are added, the
# second one wins — this is defensive; the rest of the
# graph assumes edge ids are unique.
self.edges_by_id[edge.edge_id] = edge
def add_entity_of_type(self, name: str, type_: str) -> None:
"""Tag an entity into the type index without adding an edge.
Read tools that need to know "what type is X?" or "give me
all X's" populate this directly. ``build_graph`` calls
this for every :class:`Entity` it sees.
"""
self.names.add(name)
self.entities_by_type.setdefault(type_, set()).add(name)
def add_lore_source(self, source: LoreSource) -> None:
"""Register a :class:`LoreSource`. Slice 1.3 makes the
LoreSource a first-class node per AC 1.9, 1.10; the
source's ``name`` is added to ``names`` so ``by_name``
can find the source file too.
"""
self.lore_sources[source.path] = source
if source.name:
self.names.add(source.name)
def replace_edge(self, old_id: str, new_edge: Edge) -> None:
"""Single chokepoint for :func:`retcon` and
:func:`mark_verified`. Lifts the inlined index surgery
that used to live in ``write_tools.py:447-481, 532-541``
so any backend (in-memory or Neo4j) can implement the
same semantics.
Behaviour:
* If ``new_edge`` has the same ``subject / relation /
object`` as the old edge, swap the old reference out
for the new one in the subject and object lists, and
update the id index. The id stays the same.
* If the subject, relation, or object differs, drop
the old edge from the subject / object indexes and
add the new one under its new identity. The id still
stays the same.
* ``new_edge.edge_id`` is the id we keep; if it differs
from ``old_id``, the contract is violated and this
raises :class:`ValueError`.
Raises :class:`KeyError` if ``old_id`` is not in the
graph.
"""
old = self.edges_by_id.get(old_id)
if old is None:
raise KeyError(f"replace_edge: id {old_id!r} not in graph")
if new_edge.edge_id != old_id:
raise ValueError(
f"replace_edge: new_edge.edge_id={new_edge.edge_id!r} "
f"differs from old_id={old_id!r}; ids must match"
)
if (
old.subject == new_edge.subject
and old.relation == new_edge.relation
and old.object == new_edge.object
):
# In-place swap under the same identity.
for rel_map in self.edges_by_subject.values():
for edges in rel_map.values():
for i, e in enumerate(edges):
if e.edge_id == old_id:
edges[i] = new_edge
for endpoint, edges in self.edges_by_object.items():
for i, e in enumerate(edges):
if e.edge_id == old_id:
edges[i] = new_edge
self.edges_by_id[old_id] = new_edge
return
# Subject / relation / object differ — drop from the old
# indexes and re-add under the new identity.
self._remove_edge_from_indexes(old_id, old)
self.add(new_edge)
def _remove_edge_from_indexes(self, edge_id: str, edge: Edge) -> None:
"""Internal helper: drop ``edge`` (looked up by id) from
all four indexes. Used by :func:`replace_edge` and
:func:`remove_entity`."""
if edge.subject in self.edges_by_subject:
old_rels = self.edges_by_subject[edge.subject]
if edge.relation in old_rels:
old_list = old_rels[edge.relation]
old_list[:] = [e for e in old_list if e.edge_id != edge_id]
if not old_list:
del old_rels[edge.relation]
if not old_rels:
del self.edges_by_subject[edge.subject]
for endpoint in (edge.object, edge.subject):
if endpoint in self.edges_by_object:
obj_list = self.edges_by_object[endpoint]
obj_list[:] = [e for e in obj_list if e.edge_id != edge_id]
if not obj_list:
del self.edges_by_object[endpoint]
self.edges_by_id.pop(edge_id, None)
def remove_entity(self, name: str) -> int:
"""Remove ``name`` and every edge that touches it.
Cascades through ``edges_by_subject`` and ``edges_by_object``
so no dangling references remain. Returns the number of
edges removed. Used by the slice-10 ``delete_entity`` tool.
"""
# Collect the ids we're about to drop so we can update
# the ``edges_by_id`` index in one pass.
ids_to_drop = {
e.edge_id
for edges in self.edges_by_subject.values()
for edge_list in edges.values()
for e in edge_list
if e.subject == name or e.object == name
}
# Count first, then strip. We rebuild the indexes without
# ``name`` because Edge is immutable (dataclass) and the
# indexes are plain dicts.
removed = 0
for subject, rel_map in list(self.edges_by_subject.items()):
for rel, edges in list(rel_map.items()):
survivors = [e for e in edges if e.subject != name and e.object != name]
removed += len(edges) - len(survivors)
rel_map[rel] = survivors
if not survivors:
del rel_map[rel]
if not rel_map:
del self.edges_by_subject[subject]
for endpoint, edges in list(self.edges_by_object.items()):
survivors = [e for e in edges if e.subject != name and e.object != name]
self.edges_by_object[endpoint] = survivors
if not survivors:
del self.edges_by_object[endpoint]
for eid in ids_to_drop:
self.edges_by_id.pop(eid, None)
self.names.discard(name)
for type_, members in list(self.entities_by_type.items()):
members.discard(name)
if not members:
del self.entities_by_type[type_]
self.aliases.pop(name, None)
return removed
def remove_entity_of_type(self, name: str, type_: str) -> None:
"""Drop ``name`` from the type index only. No edges
are touched. Used by ``update_entity`` when a type
tag is being demoted without removing the entity."""
members = self.entities_by_type.get(type_)
if members is not None:
members.discard(name)
if not members:
self.entities_by_type.pop(type_, None)
def rename_entity(self, old: str, new: str) -> int:
"""Rename ``old`` → ``new`` in the names set, type index,
and every edge endpoint. Returns the number of edges
re-pointed. Used by the slice-10 ``update_entity`` tool."""
if old == new:
return 0
# Names + types
if old in self.names:
self.names.discard(old)
self.names.add(new)
for type_, members in self.entities_by_type.items():
if old in members:
members.discard(old)
members.add(new)
# Re-point edges. We rebuild edges_by_subject and
# edges_by_object because Edge objects themselves are
# immutable (dataclass) — replacing them is the cleanest
# path. For the POC's typical world size this is fine.
re_pointed = 0
# Subject index: keys are subjects; values are dicts of
# relation → list[Edge]. Some subjects == old.
if old in self.edges_by_subject:
sub_edges = self.edges_by_subject.pop(old)
self.edges_by_subject.setdefault(new, {})
for rel, edges in sub_edges.items():
for e in edges:
e_new = replace(e, subject=new)
self.edges_by_subject[new].setdefault(rel, []).append(e_new)
# Re-key the id index to the replaced edge so
# retcon / mark_verified can still find it.
self.edges_by_id[e.edge_id] = e_new
re_pointed += 1
# Object index: values are list[Edge]; the key is endpoint
# (object name or subject name, since we index both).
new_obj_index: dict[str, list[Edge]] = {}
for endpoint, edges in self.edges_by_object.items():
target = new if endpoint == old else endpoint
for e in edges:
if e.subject == old or e.object == old:
e_new = e
if e.subject == old:
e_new = replace(e_new, subject=new)
if e.object == old:
e_new = replace(e_new, object=new)
new_obj_index.setdefault(target, []).append(e_new)
# Same id index maintenance for the object-side
# rename path.
self.edges_by_id[e.edge_id] = e_new
else:
new_obj_index.setdefault(target, []).append(e)
# Merge: also rebuild subject index for object-side renames
# (edges where old was the object, not the subject).
self.edges_by_object = new_obj_index
# Re-derive edges_by_subject from edges_by_object for any
# edge where old was the *object*.
for endpoint, edges in new_obj_index.items():
for e in edges:
if e.subject == new:
self.edges_by_subject.setdefault(new, {}).setdefault(
e.relation, []
)
# Avoid duplicates from the subject-side rebuild.
if e not in self.edges_by_subject[new][e.relation]:
self.edges_by_subject[new][e.relation].append(e)
# Aliases: the rename preserves the old name as an alias
# pointing at the new canonical. This always happens —
# the world-builder may later call ``merge_entities``
# which relies on the alias being registered to find
# the canonical form via ``by_name``.
old_aliases = self.aliases.pop(old, set())
alias_set = self.aliases.setdefault(new, set())
if old_aliases:
alias_set.update(old_aliases)
alias_set.add(old)
return re_pointed
def register_alias(self, canonical: str, alias: str) -> None:
"""Add ``alias`` as an alternative name for ``canonical``.
The canonical name does not need to be in the names set
for the alias to register (callers may register an
alias before the canonical is materialized). However,
:func:`by_name` will not resolve via the alias unless
the canonical is in the names set — the alias is only
consulted as a fallback.
"""
if alias == canonical:
return
self.aliases.setdefault(canonical, set()).add(alias)
def register_name(self, name: str) -> None:
"""Add ``name`` to the names set without an edge or type
tag. Used by :func:`build_graph` for entities that are
mentioned in the codex but have no relations yet."""
self.names.add(name)
def resolve_alias(self, alias: str) -> Optional[str]:
"""If ``alias`` is registered as an alternative name for
some canonical entity, return the canonical name. Otherwise
return ``None``. Used by the slice-10 ``set_alias`` /
``update_entity`` read paths."""
low = alias.lower()
for canonical, alias_set in self.aliases.items():
if alias in alias_set or alias.lower() in {a.lower() for a in alias_set}:
return canonical
return None

View File

@@ -124,228 +124,16 @@ def _windows_consistent(a_from, a_until, b_from, b_until) -> bool:
return True
@dataclass
class Graph:
"""In-memory graph: name -> {relation -> [Edge, ...]}.
# Slice 5.1 — the in-memory graph has moved to
# :mod:`lore_engine_poc.graph_backend` as :class:`InMemoryGraph`
# with a :class:`GraphBackend` Protocol. ``Graph`` is a
# back-compat alias so the 559 existing tests (and any external
# code that imports ``Graph``) keep working unchanged.
from .graph_backend import ( # noqa: F401, E402 -- re-export for back-compat
GraphBackend,
InMemoryGraph as Graph,
)
Slice 4.0 added two reverse-direction indexes to make
reverse-traversal tools O(1):
* ``edges_by_object`` — name -> [Edge, ...] indexed by the
edge's *object*. Lets tools like ``ancestors_of(person)``
or ``members_of(faction)`` answer "who points at X?" in a
single lookup instead of a full subject-side scan.
* ``entities_by_type`` — type -> {name, ...} for the
slice-0 ``Entity.type`` strings (``"npc"``, ``"faction"``,
``"location"``, etc.). Populated by ``build_graph`` for
every entity in the input. Read tools that need
"all factions" / "all locations" / "all NPCs" consult this
index.
Both indexes are **additive** — graphs built the slice-0/1/2
way (without these fields) are still valid; tools that need
the new indexes fall back to a full scan when they're empty.
"""
edges_by_subject: dict[str, dict[str, list[Edge]]] = field(default_factory=dict)
names: set[str] = field(default_factory=set)
# Side index: every LoreSource (markdown file or YAML file) keyed
# by its full path. Populated by :func:`build_graph` whenever it
# sees an :class:`Entity` (markdown path) or a ``_LORESOURCE_NODE``
# marker triple (structured-YAML path). Slice 1.3 makes the
# LoreSource a first-class node per AC 1.9, 1.10.
lore_sources: dict[str, LoreSource] = field(default_factory=dict)
# Slice 4.0 reverse indexes — see class docstring.
edges_by_object: dict[str, list[Edge]] = field(default_factory=dict)
entities_by_type: dict[str, set[str]] = field(default_factory=dict)
# Slice 10.0 — alternative names for entities (set by
# ``set_alias``). A name → set[alias, ...] map; the alias
# resolves to the canonical name via :func:`resolve_alias`.
aliases: dict[str, set[str]] = field(default_factory=dict)
# Slice 10.2 — id → edge reverse index, populated by
# :func:`add`. Lets the retcon / mark_verified write tools
# find a specific edge by its stable id in O(1). The
# subject / object indexes are still authoritative for
# query-style lookups.
edges_by_id: dict[str, Edge] = field(default_factory=dict)
def add(self, edge: Edge) -> None:
self.names.add(edge.subject)
self.names.add(edge.object)
self.edges_by_subject.setdefault(edge.subject, {}).setdefault(
edge.relation, []
).append(edge)
# Maintain the reverse-direction index. We index *every*
# edge by both endpoints so "who points at X" is O(1).
self.edges_by_object.setdefault(edge.object, []).append(edge)
self.edges_by_object.setdefault(edge.subject, []).append(edge)
# Id index. If two edges with the same id are added, the
# second one wins — this is defensive; the rest of the
# graph assumes edge ids are unique.
self.edges_by_id[edge.edge_id] = edge
def find_edge_by_id(self, edge_id: str) -> Optional[Edge]:
"""O(1) lookup by stable per-edge id. Returns ``None`` if
the id isn't in the graph (e.g. the edge was deleted)."""
return self.edges_by_id.get(edge_id)
def add_entity_of_type(self, name: str, type_: str) -> None:
"""Tag an entity into the type index without adding an edge.
Read tools that need to know "what type is X?" or "give me
all X's" populate this directly. ``build_graph`` calls
this for every :class:`Entity` it sees.
"""
self.names.add(name)
self.entities_by_type.setdefault(type_, set()).add(name)
def remove_entity(self, name: str) -> int:
"""Remove ``name`` and every edge that touches it.
Cascades through ``edges_by_subject`` and ``edges_by_object``
so no dangling references remain. Returns the number of
edges removed. Used by the slice-10 ``delete_entity`` tool.
"""
# Collect the ids we're about to drop so we can update
# the ``edges_by_id`` index in one pass.
ids_to_drop = {
e.edge_id
for edges in self.edges_by_subject.values()
for edge_list in edges.values()
for e in edge_list
if e.subject == name or e.object == name
}
# Count first, then strip. We rebuild the indexes without
# ``name`` because Edge is immutable (dataclass) and the
# indexes are plain dicts.
removed = 0
for subject, rel_map in list(self.edges_by_subject.items()):
for rel, edges in list(rel_map.items()):
survivors = [e for e in edges if e.subject != name and e.object != name]
removed += len(edges) - len(survivors)
rel_map[rel] = survivors
if not survivors:
del rel_map[rel]
if not rel_map:
del self.edges_by_subject[subject]
for endpoint, edges in list(self.edges_by_object.items()):
survivors = [e for e in edges if e.subject != name and e.object != name]
self.edges_by_object[endpoint] = survivors
if not survivors:
del self.edges_by_object[endpoint]
for eid in ids_to_drop:
self.edges_by_id.pop(eid, None)
self.names.discard(name)
for type_, members in list(self.entities_by_type.items()):
members.discard(name)
if not members:
del self.entities_by_type[type_]
self.aliases.pop(name, None)
return removed
def rename_entity(self, old: str, new: str) -> int:
"""Rename ``old`` → ``new`` in the names set, type index,
and every edge endpoint. Returns the number of edges
re-pointed. Used by the slice-10 ``update_entity`` tool."""
if old == new:
return 0
# Names + types
if old in self.names:
self.names.discard(old)
self.names.add(new)
for type_, members in self.entities_by_type.items():
if old in members:
members.discard(old)
members.add(new)
# Re-point edges. We rebuild edges_by_subject and
# edges_by_object because Edge objects themselves are
# immutable (dataclass) — replacing them is the cleanest
# path. For the POC's typical world size this is fine.
re_pointed = 0
# Subject index: keys are subjects; values are dicts of
# relation → list[Edge]. Some subjects == old.
if old in self.edges_by_subject:
sub_edges = self.edges_by_subject.pop(old)
self.edges_by_subject.setdefault(new, {})
for rel, edges in sub_edges.items():
for e in edges:
e_new = replace(e, subject=new)
self.edges_by_subject[new].setdefault(rel, []).append(e_new)
# Re-key the id index to the replaced edge so
# retcon / mark_verified can still find it.
self.edges_by_id[e.edge_id] = e_new
re_pointed += 1
# Object index: values are list[Edge]; the key is endpoint
# (object name or subject name, since we index both).
new_obj_index: dict[str, list[Edge]] = {}
for endpoint, edges in self.edges_by_object.items():
target = new if endpoint == old else endpoint
for e in edges:
if e.subject == old or e.object == old:
e_new = e
if e.subject == old:
e_new = replace(e_new, subject=new)
if e.object == old:
e_new = replace(e_new, object=new)
new_obj_index.setdefault(target, []).append(e_new)
# Same id index maintenance for the object-side
# rename path.
self.edges_by_id[e.edge_id] = e_new
else:
new_obj_index.setdefault(target, []).append(e)
# Merge: also rebuild subject index for object-side renames
# (edges where old was the object, not the subject).
self.edges_by_object = new_obj_index
# Re-derive edges_by_subject from edges_by_object for any
# edge where old was the *object*.
for endpoint, edges in new_obj_index.items():
for e in edges:
if e.subject == new:
self.edges_by_subject.setdefault(new, {}).setdefault(
e.relation, []
)
# Avoid duplicates from the subject-side rebuild.
if e not in self.edges_by_subject[new][e.relation]:
self.edges_by_subject[new][e.relation].append(e)
# Aliases: the rename preserves the old name as an alias
# pointing at the new canonical. This always happens —
# the world-builder may later call ``merge_entities``
# which relies on the alias being registered to find
# the canonical form via ``by_name``.
old_aliases = self.aliases.pop(old, set())
alias_set = self.aliases.setdefault(new, set())
if old_aliases:
alias_set.update(old_aliases)
alias_set.add(old)
return re_pointed
def resolve_alias(self, alias: str) -> Optional[str]:
"""If ``alias`` is registered as an alternative name for
some canonical entity, return the canonical name. Otherwise
return ``None``. Used by the slice-10 ``set_alias`` /
``update_entity`` read paths."""
low = alias.lower()
for canonical, alias_set in self.aliases.items():
if alias in alias_set or alias.lower() in {a.lower() for a in alias_set}:
return canonical
return None
def by_name(self, name: str) -> Optional[str]:
"""Resolve a name to a canonical form (case-insensitive).
Also follows aliases: if ``name`` matches an alias, the
canonical name is returned.
"""
if name in self.names:
return name
low = name.lower()
for n in self.names:
if n.lower() == low:
return n
aliased = self.resolve_alias(name)
if aliased is not None:
return aliased
return None
def build_graph(entities: Iterable[Entity], triples: Iterable[Triple]) -> Graph:

View File

@@ -0,0 +1,261 @@
"""Tests for the GraphBackend Protocol (slice 5.1).
These tests validate the contract that any graph backend
(in-memory or Neo4j) must satisfy. Slice 5.1 ships the
Protocol + the ``InMemoryGraph`` implementation (renamed from
``Graph``); slice 5.3+ will add a ``Neo4jGraph`` that must
pass the same contract tests.
The ``Graph is InMemoryGraph`` alias is a back-compat shim
so the 559 existing tests (and any external code that imports
``Graph``) keep working without edits.
"""
from __future__ import annotations
import pytest
from lore_engine_poc.graph_backend import GraphBackend, InMemoryGraph
from lore_engine_poc.tools import Edge, Graph
# ---------------------------------------------------------------------------
# Test 1 — `Graph` is an alias for `InMemoryGraph`
# ---------------------------------------------------------------------------
def test_graph_alias_is_inmemory_graph():
"""The ``Graph`` name in ``lore_engine_poc.tools`` is the same
class as ``InMemoryGraph`` in ``graph_backend``. Zero test churn
across the 559 existing tests that import ``Graph``."""
assert Graph is InMemoryGraph
# ---------------------------------------------------------------------------
# Test 2 — Protocol conformance via isinstance
# ---------------------------------------------------------------------------
def test_inmemory_graph_satisfies_graph_backend_protocol():
"""``isinstance(Graph(), GraphBackend)`` is True. This is the
pattern that ``mcp_server.py:178`` will use to type-check
``self.graph`` once the Protocol is in place."""
assert isinstance(Graph(), GraphBackend)
# ---------------------------------------------------------------------------
# Test 3 — add() populates all 4 indexes
# ---------------------------------------------------------------------------
def _e(s, r, o, **kw):
return Edge(subject=s, relation=r, object=o, **kw)
def test_add_populates_all_indexes():
g = InMemoryGraph()
e = _e("Alice", "MEMBER_OF", "House Raventhorne")
g.add(e)
assert "Alice" in g.names
assert "House Raventhorne" in g.names
assert e in g.edges_by_subject["Alice"]["MEMBER_OF"]
assert e in g.edges_by_object["House Raventhorne"]
assert e in g.edges_by_object["Alice"] # indexed by both endpoints
assert g.edges_by_id[e.edge_id] is e
# ---------------------------------------------------------------------------
# Test 4 — edges_for_subject returns edges in insertion order
# ---------------------------------------------------------------------------
def test_edges_for_subject_returns_insertion_order():
g = InMemoryGraph()
e1 = _e("Alice", "MEMBER_OF", "House A")
e2 = _e("Alice", "MEMBER_OF", "House B")
g.add(e1)
g.add(e2)
assert g.edges_for_subject("Alice", "MEMBER_OF") == [e1, e2]
# ---------------------------------------------------------------------------
# Test 5 — edges_for_object returns edges
# ---------------------------------------------------------------------------
def test_edges_for_object_returns_incoming_edges():
g = InMemoryGraph()
e1 = _e("Alice", "MEMBER_OF", "House A")
e2 = _e("Bob", "MEMBER_OF", "House A")
g.add(e1)
g.add(e2)
incoming = g.edges_for_object("House A")
assert e1 in incoming
assert e2 in incoming
# ---------------------------------------------------------------------------
# Test 6 — by_name is case-insensitive
# ---------------------------------------------------------------------------
def test_by_name_case_insensitive():
g = InMemoryGraph()
g.names.add("Roland Raventhorne")
assert g.by_name("roland raventhorne") == "Roland Raventhorne"
assert g.by_name("ROLAND RAVENTHORNE") == "Roland Raventhorne"
# ---------------------------------------------------------------------------
# Test 7 — add_entity_of_type round-trip
# ---------------------------------------------------------------------------
def test_add_entity_of_type_round_trip():
g = InMemoryGraph()
g.add_entity_of_type("Aldric", "Person")
assert "Aldric" in g.entities_by_type["Person"]
assert "Aldric" in g.names
# ---------------------------------------------------------------------------
# Test 8 — find_edge_by_id is O(1)
# ---------------------------------------------------------------------------
def test_find_edge_by_id_returns_edge():
g = InMemoryGraph()
e = _e("A", "R", "B")
g.add(e)
assert g.find_edge_by_id(e.edge_id) is e
def test_find_edge_by_id_returns_none_for_unknown():
g = InMemoryGraph()
assert g.find_edge_by_id("nope") is None
# ---------------------------------------------------------------------------
# Test 9 — replace_edge with subject/relation/object change
# ---------------------------------------------------------------------------
def test_replace_edge_subject_relation_object_change_drops_from_old_indexes():
g = InMemoryGraph()
old = _e("Alice", "MEMBER_OF", "House A")
g.add(old)
new = _e("Alice", "MEMBER_OF", "House B", edge_id=old.edge_id)
g.replace_edge(old.edge_id, new)
# Old endpoint "House A" no longer has the edge in its
# object index.
assert old not in g.edges_for_object("House A")
# New endpoint "House B" does.
assert new in g.edges_for_object("House B")
# Id index points at the new edge.
assert g.find_edge_by_id(old.edge_id) is new
# ---------------------------------------------------------------------------
# Test 10 — replace_edge same subject/relation/object swaps in place
# ---------------------------------------------------------------------------
def test_replace_edge_same_endpoints_swaps_in_place():
g = InMemoryGraph()
old = _e("Alice", "MEMBER_OF", "House A")
g.add(old)
new = _e("Alice", "MEMBER_OF", "House A",
valid_from="3rd_age.year_300",
retcon_at="3rd_age.year_400",
edge_id=old.edge_id)
g.replace_edge(old.edge_id, new)
# Same id, different object identity.
assert g.find_edge_by_id(old.edge_id) is new
assert new in g.edges_for_subject("Alice", "MEMBER_OF")
assert old not in g.edges_for_subject("Alice", "MEMBER_OF")
# Only one entry in the subject list.
assert len(g.edges_for_subject("Alice", "MEMBER_OF")) == 1
# ---------------------------------------------------------------------------
# Test 11 — replace_edge with non-existent id raises
# ---------------------------------------------------------------------------
def test_replace_edge_unknown_id_raises():
g = InMemoryGraph()
bogus = _e("Alice", "MEMBER_OF", "House A")
with pytest.raises(KeyError):
g.replace_edge("e-deadbeef", bogus)
# ---------------------------------------------------------------------------
# Test 12 — remove_entity cascades
# ---------------------------------------------------------------------------
def test_remove_entity_cascades_through_all_indexes():
g = InMemoryGraph()
e = _e("Alice", "MEMBER_OF", "House A")
g.add(e)
g.add_entity_of_type("Alice", "Person")
g.register_alias("Alice", "Ally")
removed = g.remove_entity("Alice")
assert removed == 1
assert "Alice" not in g.names
assert "Alice" not in g.edges_by_subject
assert "Alice" not in g.edges_by_object
assert "Alice" not in g.entities_by_type.get("Person", set())
assert "Alice" not in g.aliases
# ---------------------------------------------------------------------------
# Test 13 — rename_entity preserves old name as alias
# ---------------------------------------------------------------------------
def test_rename_entity_preserves_old_name_as_alias():
g = InMemoryGraph()
e = _e("Alice", "MEMBER_OF", "House A")
g.add(e)
g.add_entity_of_type("Alice", "Person")
g.rename_entity("Alice", "Alicia")
# by_name resolves both the old and new names.
assert g.by_name("Alicia") == "Alicia"
assert g.by_name("Alice") == "Alicia"
# The edge list under "Alicia" now holds a renamed edge.
new_edges = g.edges_for_subject("Alicia", "MEMBER_OF")
assert len(new_edges) == 1
assert new_edges[0].subject == "Alicia"
# ---------------------------------------------------------------------------
# Test 14 — register_alias + by_name
# ---------------------------------------------------------------------------
def test_register_alias_and_resolve_via_by_name():
g = InMemoryGraph()
g.names.add("Roland Raventhorne")
g.register_alias("Roland Raventhorne", "Sir Roland")
assert g.by_name("Sir Roland") == "Roland Raventhorne"
# ---------------------------------------------------------------------------
# Test 15 — empty graph: every method returns the empty answer
# ---------------------------------------------------------------------------
def test_empty_graph_methods_return_empty():
g = InMemoryGraph()
assert g.edges_for_subject("anyone", "anything") == []
assert g.edges_for_object("anything") == []
assert g.by_name("anyone") is None
assert g.find_edge_by_id("e-anything") is None
assert g.remove_entity("anyone") == 0
assert g.rename_entity("a", "b") == 0
# entities_of_type returns an empty set (not None) for unknown
# types — callers want to iterate without a None check.
assert g.entities_of_type("UnknownType") == set()
# all_names is an empty set.
assert g.all_names() == set()