Files
lore-engine-poc-v3/lore_engine_poc/consistency_runner.py
Lore Engine Dev 299ad5c146 slice 5.6: 01_ingest.py --write-neo4j dual-write flag
After the in-memory graph + pickle are written, the new flag
mirrors the full graph into the Neo4j 5 container at
$LORE_NEO4J_URI (default bolt://127.0.0.1:7687). The flag
is opt-in (default off) so the existing test suite's
invocations of 01_ingest.py without Docker still work.

The mirror logic:

  * Pre-pass for LoreSource nodes (full metadata via
    add_lore_source so SOURCED_FROM links find them with
    name, source_type, reliability, source_confidence).
  * Pre-pass for bare names (entities registered without
    any edge participation — keeps :Entity count in sync
    with in-memory all_names()).
  * Then the edges, add()-ed one by one.

Failure semantics:

  * Neo4j unreachable at startup → log + exit 3.
  * neo4j_graph not importable → log + exit 2.
  * Pickle is always written before the mirror attempt, so
    a flaky Neo4j container never loses the in-memory state.

Consistency runner stability:

  _detect_contradictions Pattern 2 (same object, different
  subjects) now sorts the two claims alphabetically so
  claim_a / claim_b are stable across runs. The
  graph.all_names() set iteration order is otherwise
  non-deterministic across Python processes and across
  the in-memory / Neo4j backends, and the original
  dict-iteration insertion order broke when slice 5.4
  migrated to all_names().

Tests:

  * tests/test_scripts/test_ingest_neo4j.py — 5 docker-gated
    tests (exits zero, entity count, relation count,
    default-off untouched, fails loud on unreachable URI).
  * tests/test_consistency/test_runner_categories.py — one
    test updated to assert claim_a/claim_b as a set rather
    than a specific order (matches the runner's new
    lexicographic-sort contract).

Suite: 619 -> 624 passed (+5 ingest-neo4j tests, all 559
baseline + 32 Neo4j + consistency + ingest tests preserved).
2026-06-18 23:01:35 -04:00

473 lines
19 KiB
Python

"""Lore Engine POC — consistency runner (slice 2.1).
The :class:`ConsistencyRunner` walks an in-memory :class:`Graph`
and produces a :class:`ConsistencyRun` plus a list of violations.
It implements Categories A, B, and D (Contradiction, Anachronism,
Orphan) directly. Category C (Ontology rules) lives in
:mod:`lore_engine_poc.ontology_rules` and is wired in slice 2.2.
The runner is **read-only**: it never mutates the graph. The
``run()`` call returns a fresh :class:`ConsistencyRun` and stashes
the violation list on ``runner.last_violations`` for inspection
(MCP tools read it from there). The runner also keeps a tiny
history so ``latest_run()`` can return the most recent run.
Detection conventions (slice 2.1 POC):
* **Person lifespan** is inferred from MEMBER_OF(Lineage) edges:
``birth = min(valid_from)`` across all MEMBER_OF edges,
``death = max(valid_until)`` across all MEMBER_OF edges
(``None`` when the Person is still living). A Person with no
MEMBER_OF edges has no inferred lifespan and is skipped for
anachronism detection.
* **Contradiction (Category A):** two edges on the same
``(subject, relation)`` whose ``time_in_window``-style overlap
check passes and whose ``object`` differs.
* **Anachronism (Category B):** for each ``PARTICIPATED_IN`` /
``RULED`` / ``LOCATED_IN`` edge of a Person with a known
lifespan, check the edge's ``valid_from`` against the Person's
inferred birth, and ``valid_until`` against the Person's
inferred death. Out-of-window edges become Anachronism nodes.
* **Orphan (Category D):** an entity with no edges at all is an
Orphan. The ``reason`` field uses a pre-baked vocabulary
(``docs/04-consistency.md`` §Category D) so a UI can group on
it.
The runner is designed for in-memory POC; slice 2.6 will wrap the
same logic against Cognee.
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from typing import Iterable, Optional
from .consistency import (
Anachronism,
Contradiction,
ConsistencyRun,
Orphan,
_utc_now_iso,
)
from .time_model import time_in_window
from .tools import Edge, Graph
# ---------------------------------------------------------------------------
# Result holder
# ---------------------------------------------------------------------------
@dataclass
class RunnerResult:
"""A consistency run's full output: the summary node + the
violation list. The runner returns the summary node from
``run()`` and stashes the violation list on
``runner.last_violations``.
"""
run: ConsistencyRun
violations: list = field(default_factory=list)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _infer_person_lifespan(edges: Iterable[Edge]) -> tuple[Optional[str], Optional[str]]:
"""Infer a Person's birth/death from their MEMBER_OF(Lineage) edges.
Returns ``(birth, death)``. ``birth`` is the minimum
``valid_from`` across all MEMBER_OF edges; ``death`` is the
maximum ``valid_until``. Either may be ``None`` (unbounded
Person, or no MEMBER_OF edges at all).
"""
birth: Optional[str] = None
death: Optional[str] = None
for e in edges:
if e.relation != "MEMBER_OF":
continue
if e.valid_from is not None:
if birth is None or _time_lt(e.valid_from, birth):
birth = e.valid_from
if e.valid_until is not None:
if death is None or _time_gt(e.valid_until, death):
death = e.valid_until
return birth, death
def _time_lt(a: str, b: str) -> bool:
"""``a < b`` in the canonical time tree."""
from .time_model import _cmp_atoms
return _cmp_atoms(a, b) < 0
def _time_gt(a: str, b: str) -> bool:
"""``a > b`` in the canonical time tree."""
from .time_model import _cmp_atoms
return _cmp_atoms(a, b) > 0
def _violation_min_source_confidence(violation) -> float:
"""The minimum ``source_confidence`` across a violation's sources.
Used by the ``confidence_threshold`` config knob: a violation
whose minimum source confidence is below the threshold is
suppressed. The :class:`Anachronism` and
:class:`Contradiction` dataclasses carry a
``source_confidences`` tuple parallel to ``sources``; other
violation types default to 1.0 (high confidence).
"""
confidences = getattr(violation, "source_confidences", ())
if not confidences:
return 1.0
return min(confidences)
def _edge_window_overlap(a: Edge, b: Edge) -> bool:
"""Two edges' time windows overlap (or one is unbounded).
The test is the standard interval-overlap question: do
``[a_from, a_until]`` and ``[b_from, b_until]`` share any
moment? Unbounded on either side is treated as "open in that
direction".
Edge cases:
- Both fully unbounded → overlap (trivially).
- One fully unbounded, one bounded → overlap.
- Both bounded: overlap iff ``a_from <= b_until`` AND
``b_from <= a_until``.
The half-open semantics ``[from, until)`` apply: ``a_from ==
b_until`` is *not* an overlap (the windows meet but don't
share a moment).
"""
if (a.valid_from is None and a.valid_until is None) or (
b.valid_from is None and b.valid_until is None
):
return True
from .time_model import _cmp_atoms
# a_from <= b_until (when both have those bounds)
if a.valid_from is not None and b.valid_until is not None:
if _cmp_atoms(a.valid_from, b.valid_until) >= 0:
return False
# b_from <= a_until (when both have those bounds)
if b.valid_from is not None and a.valid_until is not None:
if _cmp_atoms(b.valid_from, a.valid_until) >= 0:
return False
return True
# ---------------------------------------------------------------------------
# ConsistencyRunner
# ---------------------------------------------------------------------------
class ConsistencyRunner:
"""Walk an in-memory :class:`Graph` and produce violations.
Usage::
runner = ConsistencyRunner()
run = runner.run(graph)
for v in runner.last_violations:
...
The runner is stateless apart from the most-recent run's
violations and the per-run history (used by ``latest_run()``).
Multiple sequential ``run()`` calls are supported.
"""
# Edge relations that imply the subject participates in the
# object. For anachronism detection, these are the relations
# whose edges should fall within the subject's lifespan.
PARTICIPATION_RELATIONS = frozenset({
"PARTICIPATED_IN",
"RULED",
"LOCATED_IN",
"POSSESSES",
"CAUSED",
"CREATED",
})
def __init__(self) -> None:
self.last_violations: list = []
self._history: list[ConsistencyRun] = []
# ----- public API --------------------------------------------------
def run(self, graph: Graph, config=None) -> ConsistencyRun:
"""Run all category A/B/D rules against ``graph`` and all
registered ontology rules (Category C).
``config`` is an optional :class:`ConsistencyConfig` from
:mod:`lore_engine_poc.consistency_config`. When omitted,
the runner uses the slice 2.1 defaults (severity=warn, no
disabled rules, no threshold, no acknowledged ids).
Returns the :class:`ConsistencyRun` summary; the violation
list is stashed on ``self.last_violations``.
"""
# Late import to avoid a circular import: the config
# module is small and self-contained.
from .consistency_config import ConsistencyConfig
from .ontology_rules import list_rules
if config is None:
config = ConsistencyConfig()
started_at = _utc_now_iso()
violations: list = []
violations.extend(self._detect_contradictions(graph))
violations.extend(self._detect_anachronisms(graph))
violations.extend(self._detect_orphans(graph))
# Category C — ontology rules. Honour disable_rules.
for rule in list_rules():
if config.is_disabled(rule.rule_id):
continue
violations.extend(rule.fn(graph))
# Apply severity override.
for v in violations:
v.severity = config.severity
# Apply confidence threshold (suppress).
if config.confidence_threshold > 0:
violations = [
v for v in violations
if _violation_min_source_confidence(v) >= config.confidence_threshold
]
# Apply acknowledge set (suppress).
if config.acknowledged:
violations = [
v for v in violations
if not config.is_acknowledged(v.id)
]
finished_at = _utc_now_iso()
# Approximate duration_ms; we don't import time.perf_counter
# to keep the test surface clean, but a real deployment
# would. The slice 2.1 POC uses started_at/finished_at
# directly and reports 0 ms — slice 2.5 will switch to a
# real timer.
run = ConsistencyRun(
id=f"run-{uuid.uuid4().hex[:8]}",
started_at=started_at,
finished_at=finished_at,
duration_ms=0,
rules_run=10, # the 10 starter rules
violations_found=sum(1 for v in violations if isinstance(v, Contradiction)),
anachronisms_found=sum(1 for v in violations if isinstance(v, Anachronism)),
orphans_found=sum(1 for v in violations if isinstance(v, Orphan)),
)
self.last_violations = violations
self._history.append(run)
return run
def latest_run(self) -> Optional[ConsistencyRun]:
"""The most recent run summary, or ``None`` if no run yet."""
return self._history[-1] if self._history else None
# ----- Category A --------------------------------------------------
def _detect_contradictions(self, graph: Graph) -> list[Contradiction]:
"""Find pairs of edges that share a relation and disagree on
one endpoint (the subject or the object) but agree on the
other.
The two patterns are:
* **Same subject, same relation, different objects.** Aldric
is in House Vyr (edge 1) and House Mardonus (edge 2) at
the same time. One of those memberships is wrong.
* **Same object, same relation, different subjects.** Two
family trees say "Theron is Aldric's father" and "Maric
is Aldric's father" — both edges have
``object = Aldric`` but differ on subject.
Time-window overlap is required (otherwise the
relationship could be valid across non-overlapping eras).
"""
out: list[Contradiction] = []
# Flatten all edges (excluding SOURCED_FROM / _LORESOURCE_NODE).
all_edges: list[Edge] = []
for subject in graph.all_names():
for e in graph.edges_for_subject(subject):
if e.relation in ("SOURCED_FROM", "_LORESOURCE_NODE"):
continue
all_edges.append(e)
# Compare every pair.
for i in range(len(all_edges)):
for j in range(i + 1, len(all_edges)):
a, b = all_edges[i], all_edges[j]
if a.relation != b.relation:
continue
# Pattern 1: same subject, different object.
if a.subject == b.subject and a.object != b.object:
if not _edge_window_overlap(a, b):
continue
sources = tuple(sorted(set(a.sources) | set(b.sources)))
out.append(Contradiction(
id=f"c-{uuid.uuid4().hex[:8]}",
subject=a.subject,
predicate=a.relation,
claim_a=a.object,
claim_b=b.object,
sources=sources,
))
continue
# Pattern 2: same object, different subject.
if a.object == b.object and a.subject != b.subject:
if not _edge_window_overlap(a, b):
continue
# The "subject" of the contradiction is the
# shared endpoint (the entity the two claims
# are about). The "predicate" is the relation.
# The two claims are the differing endpoints.
# Sort the two claims so ``claim_a`` is
# deterministically the smaller one — the
# consistency runner's output must be stable
# across runs (and across backends) so tests
# can assert exact values and MCP clients see
# the same violation id in repeated runs.
claim_a, claim_b = sorted([a.subject, b.subject])
sources = tuple(sorted(set(a.sources) | set(b.sources)))
out.append(Contradiction(
id=f"c-{uuid.uuid4().hex[:8]}",
subject=b.object,
predicate=a.relation,
claim_a=claim_a,
claim_b=claim_b,
sources=sources,
))
return out
# ----- Category B --------------------------------------------------
def _detect_anachronisms(self, graph: Graph) -> list[Anachronism]:
"""Flag participation edges whose bounds fall outside the
subject's inferred lifespan.
"""
out: list[Anachronism] = []
for subject in graph.all_names():
sub_edges = list(graph.edges_for_subject(subject))
birth, death = _infer_person_lifespan(sub_edges)
# No lifespan → can't check.
if birth is None and death is None:
continue
for e in sub_edges:
if e.relation not in self.PARTICIPATION_RELATIONS:
continue
# The participating edge's bounds must fall
# within the Person's [birth, death] window.
# Distinguish two failure modes:
#
# * Event is *before* the entity's birth
# (valid_from < birth): the entity existed
# AFTER the event → "EXISTED_AFTER".
# * Event is *after* the entity's death
# (valid_until > death): the entity existed
# BEFORE the event → "EXISTED_BEFORE".
#
# The first check is the "before birth" case;
# the second is the "after death" case. They
# are mutually exclusive when the entity's
# lifespan is non-empty.
is_before_birth = (
e.valid_from is not None
and birth is not None
and not time_in_window(e.valid_from, birth, death)
and _time_lt(e.valid_from, birth)
)
if is_before_birth:
out.append(Anachronism(
id=f"a-{uuid.uuid4().hex[:8]}",
entity_name=subject,
event_name=e.object,
claim="EXISTED_AFTER",
expected=birth,
actual=e.valid_from,
sources=tuple(e.sources),
source_confidences=tuple(e.source_confidences),
))
continue
is_after_death = (
e.valid_until is not None
and death is not None
and not time_in_window(e.valid_until, birth, death)
and _time_gt(e.valid_until, death)
)
if is_after_death:
out.append(Anachronism(
id=f"a-{uuid.uuid4().hex[:8]}",
entity_name=subject,
event_name=e.object,
claim="EXISTED_BEFORE",
expected=death,
actual=e.valid_until,
sources=tuple(e.sources),
source_confidences=tuple(e.source_confidences),
))
return out
# ----- Category D --------------------------------------------------
# Pre-baked reason vocabulary, mirroring
# ``docs/04-consistency.md`` §Category D. A UI can group on
# these strings without parsing free text.
ORPHAN_REASON_NO_RELATIONSHIPS = "Entity with no recorded relationships"
ORPHAN_REASON_UNKNOWN_LINEAGE = "Person of unknown lineage"
ORPHAN_REASON_UNKNOWN_ORIGIN = "Faction of unknown origin"
ORPHAN_REASON_UNMAPPED = "Unmapped location"
ORPHAN_REASON_NO_LOCATION = "Event with no location"
ORPHAN_REASON_NO_ERA = "Event with no era"
ORPHAN_REASON_UNOWNED = "Unowned artifact"
ORPHAN_REASON_NO_MAGIC_SYSTEM = "Spell with no magic system"
def _detect_orphans(self, graph: Graph) -> list[Orphan]:
"""Flag entities with no edges going *out* from them.
An entity is an orphan when it has no ``edges_by_subject``
entry — meaning no relationship uses it as a subject. A
Faction that only ever appears as the ``object`` of
MEMBER_OF edges (its members point at it) is *not* an
orphan; it's structurally important even though it never
acts as a subject. The same goes for a Location that's
only ever the object of LOCATED_IN edges.
For the slice 2.1 POC, the runner emits the
"Person of unknown lineage" reason for any orphan entity
— the in-memory graph has no separate label registry. The
finer-grained Faction/Location/Event reasons land in slice
2.2 when the ontology-rule runner has access to type
metadata.
"""
out: list[Orphan] = []
for name in graph.all_names():
if list(graph.edges_for_subject(name)):
# Has at least one outgoing edge. Not an orphan.
continue
# The entity has no outgoing edges. Check whether it's
# referenced as an object anywhere — if not, it's a
# truly isolated entity.
appears_as_object = False
for other_subject in graph.all_names():
for e in graph.edges_for_subject(other_subject):
if e.object == name:
appears_as_object = True
break
if appears_as_object:
break
if appears_as_object:
continue
out.append(Orphan(
id=f"o-{uuid.uuid4().hex[:8]}",
entity_name=name,
entity_type="Person",
reason=self.ORPHAN_REASON_UNKNOWN_LINEAGE,
))
return out
__all__ = ["ConsistencyRunner", "RunnerResult"]