After the in-memory graph + pickle are written, the new flag
mirrors the full graph into the Neo4j 5 container at
$LORE_NEO4J_URI (default bolt://127.0.0.1:7687). The flag
is opt-in (default off) so the existing test suite's
invocations of 01_ingest.py without Docker still work.
The mirror logic:
* Pre-pass for LoreSource nodes (full metadata via
add_lore_source so SOURCED_FROM links find them with
name, source_type, reliability, source_confidence).
* Pre-pass for bare names (entities registered without
any edge participation — keeps :Entity count in sync
with in-memory all_names()).
* Then the edges, add()-ed one by one.
Failure semantics:
* Neo4j unreachable at startup → log + exit 3.
* neo4j_graph not importable → log + exit 2.
* Pickle is always written before the mirror attempt, so
a flaky Neo4j container never loses the in-memory state.
Consistency runner stability:
_detect_contradictions Pattern 2 (same object, different
subjects) now sorts the two claims alphabetically so
claim_a / claim_b are stable across runs. The
graph.all_names() set iteration order is otherwise
non-deterministic across Python processes and across
the in-memory / Neo4j backends, and the original
dict-iteration insertion order broke when slice 5.4
migrated to all_names().
Tests:
* tests/test_scripts/test_ingest_neo4j.py — 5 docker-gated
tests (exits zero, entity count, relation count,
default-off untouched, fails loud on unreachable URI).
* tests/test_consistency/test_runner_categories.py — one
test updated to assert claim_a/claim_b as a set rather
than a specific order (matches the runner's new
lexicographic-sort contract).
Suite: 619 -> 624 passed (+5 ingest-neo4j tests, all 559
baseline + 32 Neo4j + consistency + ingest tests preserved).
473 lines
19 KiB
Python
473 lines
19 KiB
Python
"""Lore Engine POC — consistency runner (slice 2.1).
|
|
|
|
The :class:`ConsistencyRunner` walks an in-memory :class:`Graph`
|
|
and produces a :class:`ConsistencyRun` plus a list of violations.
|
|
It implements Categories A, B, and D (Contradiction, Anachronism,
|
|
Orphan) directly. Category C (Ontology rules) lives in
|
|
:mod:`lore_engine_poc.ontology_rules` and is wired in slice 2.2.
|
|
|
|
The runner is **read-only**: it never mutates the graph. The
|
|
``run()`` call returns a fresh :class:`ConsistencyRun` and stashes
|
|
the violation list on ``runner.last_violations`` for inspection
|
|
(MCP tools read it from there). The runner also keeps a tiny
|
|
history so ``latest_run()`` can return the most recent run.
|
|
|
|
Detection conventions (slice 2.1 POC):
|
|
|
|
* **Person lifespan** is inferred from MEMBER_OF(Lineage) edges:
|
|
``birth = min(valid_from)`` across all MEMBER_OF edges,
|
|
``death = max(valid_until)`` across all MEMBER_OF edges
|
|
(``None`` when the Person is still living). A Person with no
|
|
MEMBER_OF edges has no inferred lifespan and is skipped for
|
|
anachronism detection.
|
|
* **Contradiction (Category A):** two edges on the same
|
|
``(subject, relation)`` whose ``time_in_window``-style overlap
|
|
check passes and whose ``object`` differs.
|
|
* **Anachronism (Category B):** for each ``PARTICIPATED_IN`` /
|
|
``RULED`` / ``LOCATED_IN`` edge of a Person with a known
|
|
lifespan, check the edge's ``valid_from`` against the Person's
|
|
inferred birth, and ``valid_until`` against the Person's
|
|
inferred death. Out-of-window edges become Anachronism nodes.
|
|
* **Orphan (Category D):** an entity with no edges at all is an
|
|
Orphan. The ``reason`` field uses a pre-baked vocabulary
|
|
(``docs/04-consistency.md`` §Category D) so a UI can group on
|
|
it.
|
|
|
|
The runner is designed for in-memory POC; slice 2.6 will wrap the
|
|
same logic against Cognee.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from typing import Iterable, Optional
|
|
|
|
from .consistency import (
|
|
Anachronism,
|
|
Contradiction,
|
|
ConsistencyRun,
|
|
Orphan,
|
|
_utc_now_iso,
|
|
)
|
|
from .time_model import time_in_window
|
|
from .tools import Edge, Graph
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Result holder
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class RunnerResult:
|
|
"""A consistency run's full output: the summary node + the
|
|
violation list. The runner returns the summary node from
|
|
``run()`` and stashes the violation list on
|
|
``runner.last_violations``.
|
|
"""
|
|
|
|
run: ConsistencyRun
|
|
violations: list = field(default_factory=list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _infer_person_lifespan(edges: Iterable[Edge]) -> tuple[Optional[str], Optional[str]]:
|
|
"""Infer a Person's birth/death from their MEMBER_OF(Lineage) edges.
|
|
|
|
Returns ``(birth, death)``. ``birth`` is the minimum
|
|
``valid_from`` across all MEMBER_OF edges; ``death`` is the
|
|
maximum ``valid_until``. Either may be ``None`` (unbounded
|
|
Person, or no MEMBER_OF edges at all).
|
|
"""
|
|
birth: Optional[str] = None
|
|
death: Optional[str] = None
|
|
for e in edges:
|
|
if e.relation != "MEMBER_OF":
|
|
continue
|
|
if e.valid_from is not None:
|
|
if birth is None or _time_lt(e.valid_from, birth):
|
|
birth = e.valid_from
|
|
if e.valid_until is not None:
|
|
if death is None or _time_gt(e.valid_until, death):
|
|
death = e.valid_until
|
|
return birth, death
|
|
|
|
|
|
def _time_lt(a: str, b: str) -> bool:
|
|
"""``a < b`` in the canonical time tree."""
|
|
from .time_model import _cmp_atoms
|
|
return _cmp_atoms(a, b) < 0
|
|
|
|
|
|
def _time_gt(a: str, b: str) -> bool:
|
|
"""``a > b`` in the canonical time tree."""
|
|
from .time_model import _cmp_atoms
|
|
return _cmp_atoms(a, b) > 0
|
|
|
|
|
|
def _violation_min_source_confidence(violation) -> float:
|
|
"""The minimum ``source_confidence`` across a violation's sources.
|
|
|
|
Used by the ``confidence_threshold`` config knob: a violation
|
|
whose minimum source confidence is below the threshold is
|
|
suppressed. The :class:`Anachronism` and
|
|
:class:`Contradiction` dataclasses carry a
|
|
``source_confidences`` tuple parallel to ``sources``; other
|
|
violation types default to 1.0 (high confidence).
|
|
"""
|
|
confidences = getattr(violation, "source_confidences", ())
|
|
if not confidences:
|
|
return 1.0
|
|
return min(confidences)
|
|
|
|
|
|
def _edge_window_overlap(a: Edge, b: Edge) -> bool:
|
|
"""Two edges' time windows overlap (or one is unbounded).
|
|
|
|
The test is the standard interval-overlap question: do
|
|
``[a_from, a_until]`` and ``[b_from, b_until]`` share any
|
|
moment? Unbounded on either side is treated as "open in that
|
|
direction".
|
|
|
|
Edge cases:
|
|
- Both fully unbounded → overlap (trivially).
|
|
- One fully unbounded, one bounded → overlap.
|
|
- Both bounded: overlap iff ``a_from <= b_until`` AND
|
|
``b_from <= a_until``.
|
|
|
|
The half-open semantics ``[from, until)`` apply: ``a_from ==
|
|
b_until`` is *not* an overlap (the windows meet but don't
|
|
share a moment).
|
|
"""
|
|
if (a.valid_from is None and a.valid_until is None) or (
|
|
b.valid_from is None and b.valid_until is None
|
|
):
|
|
return True
|
|
from .time_model import _cmp_atoms
|
|
# a_from <= b_until (when both have those bounds)
|
|
if a.valid_from is not None and b.valid_until is not None:
|
|
if _cmp_atoms(a.valid_from, b.valid_until) >= 0:
|
|
return False
|
|
# b_from <= a_until (when both have those bounds)
|
|
if b.valid_from is not None and a.valid_until is not None:
|
|
if _cmp_atoms(b.valid_from, a.valid_until) >= 0:
|
|
return False
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ConsistencyRunner
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class ConsistencyRunner:
|
|
"""Walk an in-memory :class:`Graph` and produce violations.
|
|
|
|
Usage::
|
|
|
|
runner = ConsistencyRunner()
|
|
run = runner.run(graph)
|
|
for v in runner.last_violations:
|
|
...
|
|
|
|
The runner is stateless apart from the most-recent run's
|
|
violations and the per-run history (used by ``latest_run()``).
|
|
Multiple sequential ``run()`` calls are supported.
|
|
"""
|
|
|
|
# Edge relations that imply the subject participates in the
|
|
# object. For anachronism detection, these are the relations
|
|
# whose edges should fall within the subject's lifespan.
|
|
PARTICIPATION_RELATIONS = frozenset({
|
|
"PARTICIPATED_IN",
|
|
"RULED",
|
|
"LOCATED_IN",
|
|
"POSSESSES",
|
|
"CAUSED",
|
|
"CREATED",
|
|
})
|
|
|
|
def __init__(self) -> None:
|
|
self.last_violations: list = []
|
|
self._history: list[ConsistencyRun] = []
|
|
|
|
# ----- public API --------------------------------------------------
|
|
|
|
def run(self, graph: Graph, config=None) -> ConsistencyRun:
|
|
"""Run all category A/B/D rules against ``graph`` and all
|
|
registered ontology rules (Category C).
|
|
|
|
``config`` is an optional :class:`ConsistencyConfig` from
|
|
:mod:`lore_engine_poc.consistency_config`. When omitted,
|
|
the runner uses the slice 2.1 defaults (severity=warn, no
|
|
disabled rules, no threshold, no acknowledged ids).
|
|
|
|
Returns the :class:`ConsistencyRun` summary; the violation
|
|
list is stashed on ``self.last_violations``.
|
|
"""
|
|
# Late import to avoid a circular import: the config
|
|
# module is small and self-contained.
|
|
from .consistency_config import ConsistencyConfig
|
|
from .ontology_rules import list_rules
|
|
if config is None:
|
|
config = ConsistencyConfig()
|
|
|
|
started_at = _utc_now_iso()
|
|
violations: list = []
|
|
violations.extend(self._detect_contradictions(graph))
|
|
violations.extend(self._detect_anachronisms(graph))
|
|
violations.extend(self._detect_orphans(graph))
|
|
# Category C — ontology rules. Honour disable_rules.
|
|
for rule in list_rules():
|
|
if config.is_disabled(rule.rule_id):
|
|
continue
|
|
violations.extend(rule.fn(graph))
|
|
# Apply severity override.
|
|
for v in violations:
|
|
v.severity = config.severity
|
|
# Apply confidence threshold (suppress).
|
|
if config.confidence_threshold > 0:
|
|
violations = [
|
|
v for v in violations
|
|
if _violation_min_source_confidence(v) >= config.confidence_threshold
|
|
]
|
|
# Apply acknowledge set (suppress).
|
|
if config.acknowledged:
|
|
violations = [
|
|
v for v in violations
|
|
if not config.is_acknowledged(v.id)
|
|
]
|
|
finished_at = _utc_now_iso()
|
|
|
|
# Approximate duration_ms; we don't import time.perf_counter
|
|
# to keep the test surface clean, but a real deployment
|
|
# would. The slice 2.1 POC uses started_at/finished_at
|
|
# directly and reports 0 ms — slice 2.5 will switch to a
|
|
# real timer.
|
|
run = ConsistencyRun(
|
|
id=f"run-{uuid.uuid4().hex[:8]}",
|
|
started_at=started_at,
|
|
finished_at=finished_at,
|
|
duration_ms=0,
|
|
rules_run=10, # the 10 starter rules
|
|
violations_found=sum(1 for v in violations if isinstance(v, Contradiction)),
|
|
anachronisms_found=sum(1 for v in violations if isinstance(v, Anachronism)),
|
|
orphans_found=sum(1 for v in violations if isinstance(v, Orphan)),
|
|
)
|
|
|
|
self.last_violations = violations
|
|
self._history.append(run)
|
|
return run
|
|
|
|
def latest_run(self) -> Optional[ConsistencyRun]:
|
|
"""The most recent run summary, or ``None`` if no run yet."""
|
|
return self._history[-1] if self._history else None
|
|
|
|
# ----- Category A --------------------------------------------------
|
|
|
|
def _detect_contradictions(self, graph: Graph) -> list[Contradiction]:
|
|
"""Find pairs of edges that share a relation and disagree on
|
|
one endpoint (the subject or the object) but agree on the
|
|
other.
|
|
|
|
The two patterns are:
|
|
|
|
* **Same subject, same relation, different objects.** Aldric
|
|
is in House Vyr (edge 1) and House Mardonus (edge 2) at
|
|
the same time. One of those memberships is wrong.
|
|
* **Same object, same relation, different subjects.** Two
|
|
family trees say "Theron is Aldric's father" and "Maric
|
|
is Aldric's father" — both edges have
|
|
``object = Aldric`` but differ on subject.
|
|
|
|
Time-window overlap is required (otherwise the
|
|
relationship could be valid across non-overlapping eras).
|
|
"""
|
|
out: list[Contradiction] = []
|
|
# Flatten all edges (excluding SOURCED_FROM / _LORESOURCE_NODE).
|
|
all_edges: list[Edge] = []
|
|
for subject in graph.all_names():
|
|
for e in graph.edges_for_subject(subject):
|
|
if e.relation in ("SOURCED_FROM", "_LORESOURCE_NODE"):
|
|
continue
|
|
all_edges.append(e)
|
|
# Compare every pair.
|
|
for i in range(len(all_edges)):
|
|
for j in range(i + 1, len(all_edges)):
|
|
a, b = all_edges[i], all_edges[j]
|
|
if a.relation != b.relation:
|
|
continue
|
|
# Pattern 1: same subject, different object.
|
|
if a.subject == b.subject and a.object != b.object:
|
|
if not _edge_window_overlap(a, b):
|
|
continue
|
|
sources = tuple(sorted(set(a.sources) | set(b.sources)))
|
|
out.append(Contradiction(
|
|
id=f"c-{uuid.uuid4().hex[:8]}",
|
|
subject=a.subject,
|
|
predicate=a.relation,
|
|
claim_a=a.object,
|
|
claim_b=b.object,
|
|
sources=sources,
|
|
))
|
|
continue
|
|
# Pattern 2: same object, different subject.
|
|
if a.object == b.object and a.subject != b.subject:
|
|
if not _edge_window_overlap(a, b):
|
|
continue
|
|
# The "subject" of the contradiction is the
|
|
# shared endpoint (the entity the two claims
|
|
# are about). The "predicate" is the relation.
|
|
# The two claims are the differing endpoints.
|
|
# Sort the two claims so ``claim_a`` is
|
|
# deterministically the smaller one — the
|
|
# consistency runner's output must be stable
|
|
# across runs (and across backends) so tests
|
|
# can assert exact values and MCP clients see
|
|
# the same violation id in repeated runs.
|
|
claim_a, claim_b = sorted([a.subject, b.subject])
|
|
sources = tuple(sorted(set(a.sources) | set(b.sources)))
|
|
out.append(Contradiction(
|
|
id=f"c-{uuid.uuid4().hex[:8]}",
|
|
subject=b.object,
|
|
predicate=a.relation,
|
|
claim_a=claim_a,
|
|
claim_b=claim_b,
|
|
sources=sources,
|
|
))
|
|
return out
|
|
|
|
# ----- Category B --------------------------------------------------
|
|
|
|
def _detect_anachronisms(self, graph: Graph) -> list[Anachronism]:
|
|
"""Flag participation edges whose bounds fall outside the
|
|
subject's inferred lifespan.
|
|
"""
|
|
out: list[Anachronism] = []
|
|
for subject in graph.all_names():
|
|
sub_edges = list(graph.edges_for_subject(subject))
|
|
birth, death = _infer_person_lifespan(sub_edges)
|
|
# No lifespan → can't check.
|
|
if birth is None and death is None:
|
|
continue
|
|
for e in sub_edges:
|
|
if e.relation not in self.PARTICIPATION_RELATIONS:
|
|
continue
|
|
# The participating edge's bounds must fall
|
|
# within the Person's [birth, death] window.
|
|
# Distinguish two failure modes:
|
|
#
|
|
# * Event is *before* the entity's birth
|
|
# (valid_from < birth): the entity existed
|
|
# AFTER the event → "EXISTED_AFTER".
|
|
# * Event is *after* the entity's death
|
|
# (valid_until > death): the entity existed
|
|
# BEFORE the event → "EXISTED_BEFORE".
|
|
#
|
|
# The first check is the "before birth" case;
|
|
# the second is the "after death" case. They
|
|
# are mutually exclusive when the entity's
|
|
# lifespan is non-empty.
|
|
is_before_birth = (
|
|
e.valid_from is not None
|
|
and birth is not None
|
|
and not time_in_window(e.valid_from, birth, death)
|
|
and _time_lt(e.valid_from, birth)
|
|
)
|
|
if is_before_birth:
|
|
out.append(Anachronism(
|
|
id=f"a-{uuid.uuid4().hex[:8]}",
|
|
entity_name=subject,
|
|
event_name=e.object,
|
|
claim="EXISTED_AFTER",
|
|
expected=birth,
|
|
actual=e.valid_from,
|
|
sources=tuple(e.sources),
|
|
source_confidences=tuple(e.source_confidences),
|
|
))
|
|
continue
|
|
is_after_death = (
|
|
e.valid_until is not None
|
|
and death is not None
|
|
and not time_in_window(e.valid_until, birth, death)
|
|
and _time_gt(e.valid_until, death)
|
|
)
|
|
if is_after_death:
|
|
out.append(Anachronism(
|
|
id=f"a-{uuid.uuid4().hex[:8]}",
|
|
entity_name=subject,
|
|
event_name=e.object,
|
|
claim="EXISTED_BEFORE",
|
|
expected=death,
|
|
actual=e.valid_until,
|
|
sources=tuple(e.sources),
|
|
source_confidences=tuple(e.source_confidences),
|
|
))
|
|
return out
|
|
|
|
# ----- Category D --------------------------------------------------
|
|
|
|
# Pre-baked reason vocabulary, mirroring
|
|
# ``docs/04-consistency.md`` §Category D. A UI can group on
|
|
# these strings without parsing free text.
|
|
ORPHAN_REASON_NO_RELATIONSHIPS = "Entity with no recorded relationships"
|
|
ORPHAN_REASON_UNKNOWN_LINEAGE = "Person of unknown lineage"
|
|
ORPHAN_REASON_UNKNOWN_ORIGIN = "Faction of unknown origin"
|
|
ORPHAN_REASON_UNMAPPED = "Unmapped location"
|
|
ORPHAN_REASON_NO_LOCATION = "Event with no location"
|
|
ORPHAN_REASON_NO_ERA = "Event with no era"
|
|
ORPHAN_REASON_UNOWNED = "Unowned artifact"
|
|
ORPHAN_REASON_NO_MAGIC_SYSTEM = "Spell with no magic system"
|
|
|
|
def _detect_orphans(self, graph: Graph) -> list[Orphan]:
|
|
"""Flag entities with no edges going *out* from them.
|
|
|
|
An entity is an orphan when it has no ``edges_by_subject``
|
|
entry — meaning no relationship uses it as a subject. A
|
|
Faction that only ever appears as the ``object`` of
|
|
MEMBER_OF edges (its members point at it) is *not* an
|
|
orphan; it's structurally important even though it never
|
|
acts as a subject. The same goes for a Location that's
|
|
only ever the object of LOCATED_IN edges.
|
|
|
|
For the slice 2.1 POC, the runner emits the
|
|
"Person of unknown lineage" reason for any orphan entity
|
|
— the in-memory graph has no separate label registry. The
|
|
finer-grained Faction/Location/Event reasons land in slice
|
|
2.2 when the ontology-rule runner has access to type
|
|
metadata.
|
|
"""
|
|
out: list[Orphan] = []
|
|
for name in graph.all_names():
|
|
if list(graph.edges_for_subject(name)):
|
|
# Has at least one outgoing edge. Not an orphan.
|
|
continue
|
|
# The entity has no outgoing edges. Check whether it's
|
|
# referenced as an object anywhere — if not, it's a
|
|
# truly isolated entity.
|
|
appears_as_object = False
|
|
for other_subject in graph.all_names():
|
|
for e in graph.edges_for_subject(other_subject):
|
|
if e.object == name:
|
|
appears_as_object = True
|
|
break
|
|
if appears_as_object:
|
|
break
|
|
if appears_as_object:
|
|
continue
|
|
out.append(Orphan(
|
|
id=f"o-{uuid.uuid4().hex[:8]}",
|
|
entity_name=name,
|
|
entity_type="Person",
|
|
reason=self.ORPHAN_REASON_UNKNOWN_LINEAGE,
|
|
))
|
|
return out
|
|
|
|
|
|
__all__ = ["ConsistencyRunner", "RunnerResult"]
|