Files
Lore Engine Dev 5a8bf67afb slice 6.2: GraphBackend Protocol + InMemoryGraph for Setting/Plane/EXISTS_IN
Extends the GraphBackend Protocol with 6 new methods (add_setting,
find_setting, add_plane, find_plane, planes_in_setting,
add_exists_in, entity_planes, setting_entities). InMemoryGraph
implements them with O(1) reverse lookups (planes_by_setting,
entities_by_setting, settings_by_entity). Neo4jGraph gains
NotImplementedError stubs so isinstance(neo4j, GraphBackend) keeps
passing until the slice 6 follow-up mirrors the Cypher.

EXISTS_IN is the timeless type-assertion per docs/17-planes.md;
time-bounded membership is the slice 6.5 reified :Relation work.

+8 tests (718 → 726). All green. No regressions.
2026-06-19 12:59:16 -04:00

650 lines
27 KiB
Python

"""Neo4j 5 graph backend (slices 5.3 + 5.4 + 5.5).
This module ships ``Neo4jGraph`` — a ``GraphBackend``-conformant
implementation of the in-memory graph in :mod:`lore_engine_poc.graph_backend`.
The Cypher shape follows ADR 0009 (reified ``:Relation`` nodes):
(:Person {name, name_lower})-[:FROM]->(:Relation
{edge_id, type, valid_from, valid_until,
sources, extraction_confidences, source_confidences,
reliabilities, is_disputed})-[:TO]->(:Faction {name, name_lower})
(:Relation)-[:SOURCED_FROM]->(:LoreSource {path, ...})
Each ``:Relation`` node has the slice 10.2 audit fields
(``retcon_at``, ``retcon_note``, ``verified_by``, ``verified_at``,
``verified_note``) so the 12 write tools and the
``retcon`` / ``mark_verified`` chokepoints work uniformly across
backends. ``is_disputed`` is a property on the node; the
``disputed_with`` list is modelled as a ``:DISPUTES`` edge to
the sibling :Relation nodes (slice 5.5).
Slices:
5.3 — Skeleton: connect, ensure_schema, by_name, all_names,
all_entity_types, entities_of_type, lore_source,
find_edge_by_id (None), add_entity_of_type, register_name,
register_alias, add_lore_source. Stubs for the rest.
5.4 — Reified :Relation reads; edges_for_subject, edges_for_object,
add (full edge upsert), replace_edge, remove_entity,
remove_entity_of_type, rename_entity. Read-tool parity
with InMemoryGraph.
5.5 — Full write-tool surface (12 write tools mirrored).
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from .tools import Edge, LoreSource
from .setting import Plane, Setting
def _edge_node_props(edge: "Edge") -> dict:
"""Serialize an :class:`Edge` to the :Relation node's property bag.
The properties mirror the Edge dataclass field-for-field so a
round-trip ``find_edge_by_id`` returns a structurally identical
Edge object.
"""
return {
"edge_id": edge.edge_id,
"type": edge.relation,
"valid_from": edge.valid_from,
"valid_until": edge.valid_until,
"sources": list(edge.sources),
"extraction_confidences": list(edge.extraction_confidences),
"source_confidences": list(edge.source_confidences),
"reliabilities": list(edge.reliabilities),
"confidence": edge.confidence,
"is_disputed": edge.is_disputed,
"retcon_at": edge.retcon_at,
"retcon_note": edge.retcon_note,
"verified_by": edge.verified_by,
"verified_at": edge.verified_at,
"verified_note": edge.verified_note,
}
def _edge_from_record(record) -> "Edge":
"""Rehydrate an :class:`Edge` from a Cypher record that holds
a :Relation node's properties.
"""
from .tools import Edge
return Edge(
subject=record["subject"],
relation=record["type"],
object=record["object"],
edge_id=record["edge_id"],
valid_from=record["valid_from"],
valid_until=record["valid_until"],
sources=list(record["sources"] or []),
extraction_confidences=list(record["extraction_confidences"] or []),
source_confidences=list(record["source_confidences"] or []),
reliabilities=list(record["reliabilities"] or []),
confidence=record["confidence"] if record["confidence"] is not None else 1.0,
is_disputed=bool(record["is_disputed"]),
retcon_at=record["retcon_at"],
retcon_note=record["retcon_note"],
verified_by=record["verified_by"],
verified_at=record["verified_at"],
verified_note=record["verified_note"],
)
def _sanitize_label(label: str) -> str:
"""Strip non-[A-Za-z0-9_] from a dynamic label so SET n:Label
is safe to interpolate.
"""
cleaned = "".join(
ch if (ch.isalnum() or ch == "_") else "_" for ch in label
)
if not cleaned or not cleaned[0].isalpha():
cleaned = "L_" + cleaned
return cleaned
class Neo4jGraph:
"""Neo4j 5 implementation of the ``GraphBackend`` Protocol.
The Cypher shape is documented at the module level. The
InMemoryGraph's dict-of-dicts is the substrate; this class
re-expresses each method as Cypher against the reified
``:Relation`` shape.
"""
def __init__(self, uri: str, *, database: str = "neo4j", auth=None):
from neo4j import GraphDatabase
self._driver = GraphDatabase.driver(uri, auth=auth)
self._uri = uri
self._database = database
# Eagerly verify the connection so the loader fails loudly
# at startup rather than on the first query. The driver
# pool opens sockets lazily otherwise, and the first
# ``session.run(...)`` would be the first place a bad URI
# shows up — too late for the slice 5.7 entry scripts
# to log a clear error.
self._driver.verify_connectivity()
# -- Lifecycle --------------------------------------------------------
def close(self) -> None:
self._driver.close()
def ensure_schema(self) -> None:
"""Create constraints + indexes. Idempotent.
Slice 5.4's index set:
* Uniqueness on ``(name_lower)`` for ``:Entity``.
* Uniqueness on ``(path)`` for ``:LoreSource``.
* Uniqueness on ``(edge_id)`` for ``:Relation``.
* Uniqueness on ``(alias_lower)`` for ``:Alias``.
"""
with self._driver.session(database=self._database) as session:
session.run(
"CREATE CONSTRAINT entity_name_lower IF NOT EXISTS "
"FOR (n:Entity) REQUIRE n.name_lower IS UNIQUE"
)
session.run(
"CREATE CONSTRAINT loresource_path IF NOT EXISTS "
"FOR (n:LoreSource) REQUIRE n.path IS UNIQUE"
)
session.run(
"CREATE CONSTRAINT relation_edge_id IF NOT EXISTS "
"FOR (n:Relation) REQUIRE n.edge_id IS UNIQUE"
)
session.run(
"CREATE CONSTRAINT alias_alias_lower IF NOT EXISTS "
"FOR (n:Alias) REQUIRE n.alias_lower IS UNIQUE"
)
# -- Read methods -----------------------------------------------------
def by_name(self, name: str) -> Optional[str]:
if not name:
return None
nl = name.lower()
with self._driver.session(database=self._database) as session:
result = session.run(
"MATCH (n:Entity {name_lower: $nl}) RETURN n.name AS n",
nl=nl,
)
record = result.single()
if record is not None:
return record["n"]
result = session.run(
"MATCH (a:Alias {alias_lower: $nl})-[:CANONICAL_OF]->"
"(e:Entity) RETURN e.name AS n",
nl=nl,
)
record = result.single()
return record["n"] if record is not None else None
def all_names(self) -> set[str]:
with self._driver.session(database=self._database) as session:
result = session.run("MATCH (n:Entity) RETURN n.name AS n")
return {record["n"] for record in result}
def all_entity_types(self) -> list[str]:
with self._driver.session(database=self._database) as session:
result = session.run(
"MATCH (n:Entity) "
"UNWIND labels(n) AS l "
"WITH l "
"WHERE l <> 'Entity' "
"RETURN DISTINCT l AS l"
)
return [record["l"] for record in result]
def entities_of_type(self, type_: str) -> set[str]:
label = _sanitize_label(type_)
cypher = (
f"MATCH (n:`{label}`) RETURN n.name AS n"
)
with self._driver.session(database=self._database) as session:
result = session.run(cypher)
return {record["n"] for record in result}
def lore_source(self, path: str) -> Optional["LoreSource"]:
from .tools import LoreSource
with self._driver.session(database=self._database) as session:
result = session.run(
"MATCH (n:LoreSource {path: $path}) "
"RETURN n.path AS path, n.name AS name, "
"n.source_type AS source_type, "
"n.reliability AS reliability, "
"n.source_confidence AS source_confidence, "
"n.ingested_at AS ingested_at",
path=path,
)
record = result.single()
if record is None:
return None
return LoreSource(
path=record["path"],
name=record["name"] or "",
source_type=record["source_type"] or "prose",
reliability=record["reliability"] or "canonical",
source_confidence=record["source_confidence"] or 1.0,
ingested_at=record["ingested_at"] or "",
)
def find_edge_by_id(self, edge_id: str) -> Optional["Edge"]:
with self._driver.session(database=self._database) as session:
result = session.run(
"MATCH (s:Entity)-[:FROM]->(r:Relation {edge_id: $eid})"
"-[:TO]->(o:Entity) "
"RETURN s.name AS subject, r.type AS type, o.name AS object, "
"r.edge_id AS edge_id, r.valid_from AS valid_from, "
"r.valid_until AS valid_until, "
"r.sources AS sources, "
"r.extraction_confidences AS extraction_confidences, "
"r.source_confidences AS source_confidences, "
"r.reliabilities AS reliabilities, "
"r.confidence AS confidence, "
"r.is_disputed AS is_disputed, "
"r.retcon_at AS retcon_at, r.retcon_note AS retcon_note, "
"r.verified_by AS verified_by, "
"r.verified_at AS verified_at, "
"r.verified_note AS verified_note",
eid=edge_id,
)
record = result.single()
if record is None:
return None
return _edge_from_record(record)
def edges_for_subject(
self, name: str, relation: Optional[str] = None
) -> list["Edge"]:
"""Return all edges where ``name`` is the subject.
When ``relation`` is given, filter to that relation type.
Returns edges in insertion order (Neo4j doesn't natively
guarantee order, but the in-memory path uses
``subject-relation-object`` keying so the mirror produces
the same observable answer for the same fixture).
"""
nl = name.lower()
if relation is None:
cypher = (
"MATCH (s:Entity {name_lower: $nl})-[:FROM]->"
"(r:Relation)-[:TO]->(o:Entity) "
"RETURN s.name AS subject, r.type AS type, o.name AS object, "
"r.edge_id AS edge_id, r.valid_from AS valid_from, "
"r.valid_until AS valid_until, "
"r.sources AS sources, "
"r.extraction_confidences AS extraction_confidences, "
"r.source_confidences AS source_confidences, "
"r.reliabilities AS reliabilities, "
"r.confidence AS confidence, "
"r.is_disputed AS is_disputed, "
"r.retcon_at AS retcon_at, r.retcon_note AS retcon_note, "
"r.verified_by AS verified_by, "
"r.verified_at AS verified_at, "
"r.verified_note AS verified_note "
"ORDER BY r.edge_id"
)
params = {"nl": nl}
else:
cypher = (
"MATCH (s:Entity {name_lower: $nl})-[:FROM]->"
"(r:Relation {type: $rel})-[:TO]->(o:Entity) "
"RETURN s.name AS subject, r.type AS type, o.name AS object, "
"r.edge_id AS edge_id, r.valid_from AS valid_from, "
"r.valid_until AS valid_until, "
"r.sources AS sources, "
"r.extraction_confidences AS extraction_confidences, "
"r.source_confidences AS source_confidences, "
"r.reliabilities AS reliabilities, "
"r.confidence AS confidence, "
"r.is_disputed AS is_disputed, "
"r.retcon_at AS retcon_at, r.retcon_note AS retcon_note, "
"r.verified_by AS verified_by, "
"r.verified_at AS verified_at, "
"r.verified_note AS verified_note "
"ORDER BY r.edge_id"
)
params = {"nl": nl, "rel": relation}
with self._driver.session(database=self._database) as session:
result = session.run(cypher, **params)
return [_edge_from_record(r) for r in result]
def edges_for_object(self, name: str) -> list["Edge"]:
"""Edges where ``name`` is the object (incoming edges)."""
nl = name.lower()
cypher = (
"MATCH (s:Entity)-[:FROM]->(r:Relation)-[:TO]->"
"(o:Entity {name_lower: $nl}) "
"RETURN s.name AS subject, r.type AS type, o.name AS object, "
"r.edge_id AS edge_id, r.valid_from AS valid_from, "
"r.valid_until AS valid_until, "
"r.sources AS sources, "
"r.extraction_confidences AS extraction_confidences, "
"r.source_confidences AS source_confidences, "
"r.reliabilities AS reliabilities, "
"r.confidence AS confidence, "
"r.is_disputed AS is_disputed, "
"r.retcon_at AS retcon_at, r.retcon_note AS retcon_note, "
"r.verified_by AS verified_by, "
"r.verified_at AS verified_at, "
"r.verified_note AS verified_note "
"ORDER BY r.edge_id"
)
with self._driver.session(database=self._database) as session:
result = session.run(cypher, nl=nl)
return [_edge_from_record(r) for r in result]
# -- Write methods ----------------------------------------------------
def add_entity_of_type(self, name: str, type_: str) -> None:
label = _sanitize_label(type_)
with self._driver.session(database=self._database) as session:
session.run(
f"MERGE (n:Entity {{name_lower: $nl}}) "
f"ON CREATE SET n.name = $name, n.name_lower = $nl "
f"SET n:`{label}`",
nl=name.lower(),
name=name,
)
def register_name(self, name: str) -> None:
with self._driver.session(database=self._database) as session:
session.run(
"MERGE (n:Entity {name_lower: $nl}) "
"ON CREATE SET n.name = $name, n.name_lower = $nl",
nl=name.lower(),
name=name,
)
def register_alias(self, canonical: str, alias: str) -> None:
with self._driver.session(database=self._database) as session:
session.run(
"MERGE (e:Entity {name_lower: $cnl}) "
"ON CREATE SET e.name = $canonical, e.name_lower = $cnl",
cnl=canonical.lower(),
canonical=canonical,
)
session.run(
"MATCH (e:Entity {name_lower: $cnl}) "
"MERGE (a:Alias {alias_lower: $anl}) "
"ON CREATE SET a.alias = $alias, a.alias_lower = $anl "
"MERGE (a)-[:CANONICAL_OF]->(e)",
cnl=canonical.lower(),
anl=alias.lower(),
alias=alias,
)
def add_lore_source(self, source: "LoreSource") -> None:
with self._driver.session(database=self._database) as session:
session.run(
"MERGE (n:LoreSource {path: $path}) "
"ON CREATE SET n.name = $name, n.path = $path, "
"n.source_type = $source_type, "
"n.reliability = $reliability, "
"n.source_confidence = $source_confidence, "
"n.ingested_at = $ingested_at",
path=source.path,
name=source.name,
source_type=source.source_type,
reliability=source.reliability,
source_confidence=source.source_confidence,
ingested_at=source.ingested_at,
)
def add(self, edge: "Edge") -> None:
"""Upsert a single edge in the reified ``:Relation`` shape.
Creates the subject and object :Entity nodes (if missing),
creates the :Relation node, links :FROM and :TO, and links
:SOURCED_FROM to the :LoreSource nodes for every source
path in the edge's ``sources`` list.
"""
from .tools import Edge
if not isinstance(edge, Edge):
raise TypeError(f"expected Edge, got {type(edge).__name__}")
props = _edge_node_props(edge)
with self._driver.session(database=self._database) as session:
# Ensure subject + object entities exist.
session.run(
"MERGE (n:Entity {name_lower: $nl}) "
"ON CREATE SET n.name = $name, n.name_lower = $nl",
nl=edge.subject.lower(),
name=edge.subject,
)
session.run(
"MERGE (n:Entity {name_lower: $nl}) "
"ON CREATE SET n.name = $name, n.name_lower = $nl",
nl=edge.object.lower(),
name=edge.object,
)
# Upsert the :Relation node (id-keyed).
session.run(
"MATCH (s:Entity {name_lower: $snl}) "
"MATCH (o:Entity {name_lower: $onl}) "
"MERGE (r:Relation {edge_id: $eid}) "
"SET r += $props "
"MERGE (s)-[:FROM]->(r) "
"MERGE (r)-[:TO]->(o)",
snl=edge.subject.lower(),
onl=edge.object.lower(),
eid=edge.edge_id,
props=props,
)
# Link sources to :LoreSource nodes.
for src_path in edge.sources:
session.run(
"MATCH (r:Relation {edge_id: $eid}) "
"MERGE (ls:LoreSource {path: $path}) "
"ON CREATE SET ls.path = $path, ls.name = $path, "
"ls.source_type = '', ls.reliability = 'canonical', "
"ls.source_confidence = 1.0, ls.ingested_at = '' "
"MERGE (r)-[:SOURCED_FROM]->(ls)",
eid=edge.edge_id,
path=src_path,
)
def replace_edge(self, old_id: str, new_edge: "Edge") -> None:
"""In-place swap of an edge by id.
Two cases (mirrors the InMemoryGraph contract):
1. Same (subject, relation, object): update the :Relation
node's properties in place; preserve the ``:FROM`` and
``:TO`` edges.
2. Different endpoints: drop the old :Relation and create
a new one with the same ``edge_id`` (so retcon's
audit-log semantics are preserved).
"""
with self._driver.session(database=self._database) as session:
old = session.run(
"MATCH (s:Entity)-[:FROM]->(r:Relation {edge_id: $eid})"
"-[:TO]->(o:Entity) "
"RETURN s.name AS subject, o.name AS object",
eid=old_id,
).single()
if old is None:
raise KeyError(f"no edge with id {old_id!r}")
same_endpoints = (
old["subject"] == new_edge.subject
and old["object"] == new_edge.object
)
if same_endpoints:
# In-place update of properties; leave the :FROM/:TO
# edges alone.
session.run(
"MATCH (r:Relation {edge_id: $eid}) SET r += $props",
eid=old_id,
props=_edge_node_props(new_edge),
)
else:
# Drop the old node; let add() recreate it with the
# new endpoints (it MERGEs on edge_id, so the new
# node *replaces* the old one at the same id).
session.run(
"MATCH (r:Relation {edge_id: $eid}) "
"DETACH DELETE r",
eid=old_id,
)
# add() will MERGE on edge_id, so the next add()
# recreates the node with the new endpoints.
self.add(new_edge)
def remove_entity(self, name: str) -> int:
"""Cascade-remove an entity. Returns the number of edges
that were attached to it (subject- or object-direction).
"""
nl = name.lower()
with self._driver.session(database=self._database) as session:
# Count edges that touch the entity (subject or object).
count_result = session.run(
"MATCH (e:Entity {name_lower: $nl}) "
"OPTIONAL MATCH (e)-[:FROM]->(r:Relation) "
"WITH e, count(DISTINCT r) AS out_count "
"OPTIONAL MATCH (r2:Relation)-[:TO]->(e) "
"WITH e, out_count, count(DISTINCT r2) AS in_count "
"RETURN out_count + in_count AS total",
nl=nl,
)
total = count_result.single()["total"] or 0
# Detach-delete the entity (and all edges/nodes that
# touch it). Aliases pointing to this entity are
# detached; the :Relation nodes go too (DETACH DELETE).
session.run(
"MATCH (e:Entity {name_lower: $nl}) DETACH DELETE e",
nl=nl,
)
# Also drop alias nodes that pointed here.
session.run(
"MATCH (a:Alias)-[c:CANONICAL_OF]->(e:Entity) "
"WHERE e IS NULL "
"DELETE a",
)
return total
def remove_entity_of_type(self, name: str, type_: str) -> None:
label = _sanitize_label(type_)
with self._driver.session(database=self._database) as session:
session.run(
f"MATCH (n:Entity {{name_lower: $nl}}) "
f"REMOVE n:`{label}`",
nl=name.lower(),
)
def rename_entity(self, old: str, new: str) -> int:
"""Rename ``old`` to ``new``; preserve ``old`` as an alias
of ``new``. Returns the number of edges that were touched
(i.e. the count of :Relation nodes whose subject or
object was ``old``).
"""
with self._driver.session(database=self._database) as session:
# Count affected edges.
count_result = session.run(
"MATCH (e:Entity {name_lower: $onl}) "
"OPTIONAL MATCH (e)-[:FROM]->(r:Relation) "
"WITH e, count(DISTINCT r) AS out_count "
"OPTIONAL MATCH (r2:Relation)-[:TO]->(e) "
"WITH e, out_count, count(DISTINCT r2) AS in_count "
"RETURN out_count + in_count AS total",
onl=old.lower(),
)
total = count_result.single()["total"] or 0
# Update the entity's name and the lower key (so the
# constraint stays valid for the *new* name). The :FROM
# / :TO edges stay attached.
session.run(
"MATCH (e:Entity {name_lower: $onl}) "
"SET e.name = $new, e.name_lower = $nnl",
onl=old.lower(),
new=new,
nnl=new.lower(),
)
# Register the old name as an alias of the new one so
# the canonical-by_name lookup still works.
session.run(
"MATCH (e:Entity {name_lower: $nnl}) "
"MERGE (a:Alias {alias_lower: $onl}) "
"ON CREATE SET a.alias = $old, a.alias_lower = $onl "
"MERGE (a)-[:CANONICAL_OF]->(e)",
nnl=new.lower(),
onl=old.lower(),
old=old,
)
return total
def resolve_alias(self, alias: str) -> Optional[str]:
al = alias.lower()
with self._driver.session(database=self._database) as session:
result = session.run(
"MATCH (a:Alias {alias_lower: $al})-[:CANONICAL_OF]->"
"(e:Entity) RETURN e.name AS n",
al=al,
)
record = result.single()
return record["n"] if record is not None else None
# -- slice 6.2: Setting + Plane + EXISTS_IN (Layer 1) --------------
# The Neo4j substrate is not yet wired for the slice 6
# Setting/Plane model. These stubs raise a clear error if
# a caller ever invokes them; a follow-up sub-slice ships
# the Cypher mirror + the constraint setup. Until then,
# the in-memory backend (default) is the canonical
# substrate for slice 6 work — see
# docs/plan/exec/06-planes.md.
#
# The stubs exist so ``isinstance(Neo4jGraph(...), GraphBackend)``
# continues to pass the structural check after slice 6.2's
# Protocol additions. Without them, Neo4jGraph would
# silently stop being a GraphBackend, which the read-tool
# dispatch relies on.
def add_setting(self, setting) -> None: # type: ignore[no-untyped-def]
raise NotImplementedError(
"Neo4jGraph.add_setting: slice 6 follow-up; "
"see docs/plan/exec/06-planes.md sub-slice 6.2."
)
def find_setting(self, setting_id: str) -> Optional["Setting"]: # type: ignore[no-untyped-def]
raise NotImplementedError(
"Neo4jGraph.find_setting: slice 6 follow-up; "
"see docs/plan/exec/06-planes.md sub-slice 6.2."
)
def add_plane(self, plane) -> None: # type: ignore[no-untyped-def]
raise NotImplementedError(
"Neo4jGraph.add_plane: slice 6 follow-up; "
"see docs/plan/exec/06-planes.md sub-slice 6.2."
)
def find_plane(self, plane_id: str) -> Optional["Plane"]: # type: ignore[no-untyped-def]
raise NotImplementedError(
"Neo4jGraph.find_plane: slice 6 follow-up; "
"see docs/plan/exec/06-planes.md sub-slice 6.2."
)
def planes_in_setting(self, setting_id: str) -> list: # type: ignore[no-untyped-def]
raise NotImplementedError(
"Neo4jGraph.planes_in_setting: slice 6 follow-up; "
"see docs/plan/exec/06-planes.md sub-slice 6.2."
)
def add_exists_in(self, *, entity_id: str, setting_id: str) -> None:
raise NotImplementedError(
"Neo4jGraph.add_exists_in: slice 6 follow-up; "
"see docs/plan/exec/06-planes.md sub-slice 6.2."
)
def entity_planes(self, entity_id: str) -> list:
raise NotImplementedError(
"Neo4jGraph.entity_planes: slice 6 follow-up; "
"see docs/plan/exec/06-planes.md sub-slice 6.2."
)
def setting_entities(self, setting_id: str) -> set:
raise NotImplementedError(
"Neo4jGraph.setting_entities: slice 6 follow-up; "
"see docs/plan/exec/06-planes.md sub-slice 6.2."
)