Files
lore-engine-poc-v3/lore_engine_poc/consistency_tools.py
Lore Engine Dev eb179c913c slice 2.4: config (disable_rules, threshold, acknowledge, severity) (9/9 tests; AC 2.8, 2.9, 2.10)
- New lore_engine_poc/consistency_config.py: ConsistencyConfig dataclass
  with disable_rules[], severity (default 'warn' per AC 2.8),
  confidence_threshold (per-rule floor), acknowledged set (AC 2.9).
  is_disabled(rule_id), is_acknowledged(id), acknowledge(id) helpers.
- ConsistencyRunner.run() now accepts an optional config parameter;
  applies severity override, skips disabled rules, suppresses below
  threshold, suppresses acknowledged violations.
- Anachronism dataclass now carries source_confidences (parallel
  to sources) so confidence_threshold can suppress low-confidence
  findings. Default = 1.0 when not set.
- get_anachronisms() got an include_flagged param (default False);
  flagged violations are hidden by default.
- 9/9 new tests; full suite 245/245 (was 236).

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-18 02:43:26 -04:00

339 lines
11 KiB
Python

"""Lore Engine POC — 10 consistency MCP tools (slice 2.3).
The 10 tools are the public API the LLM uses to ask about
consistency. The POC exposes them as plain Python functions; the
MCP server (``slice 2.6+``) wraps each one as a JSON-RPC handler.
The tools operate on a singleton :class:`ConsistencyRunner` plus
the violation list it produced on its most recent run. This is
deliberately stateful — the LLM's typical flow is:
1. ``run_consistency_check(graph)`` to populate the runner's
state.
2. ``get_contradictions()`` / ``get_anachronisms()`` / etc.
to inspect the violations.
3. ``flag_for_review(id, reason)`` to mark a warning as
acknowledged.
4. ``explain_violation(id)`` to surface the rule that produced
a violation.
The singleton is reset between tests via :func:`_reset_runner`,
exposed as a private hook. In production the singleton is replaced
by a per-session runner (slice 2.6 wires the MCP server's
session store).
"""
from __future__ import annotations
from typing import Optional
from .consistency import (
Anachronism,
Contradiction,
OntologyViolation,
Orphan,
)
from .consistency_runner import ConsistencyRunner
from .ontology_rules import RULE_REGISTRY, OntologyRule, list_rules
from .tools import Graph
# ---------------------------------------------------------------------------
# Module-level singleton — one ConsistencyRunner per process
# ---------------------------------------------------------------------------
_runner: Optional[ConsistencyRunner] = None
def _get_runner() -> ConsistencyRunner:
"""Lazily create the singleton runner."""
global _runner
if _runner is None:
_runner = ConsistencyRunner()
return _runner
def _reset_runner() -> None:
"""Drop the singleton — used by tests to get a clean state."""
global _runner
_runner = None
# ---------------------------------------------------------------------------
# Tool 7 — run_consistency_check + Tool 8 — latest_run
# ---------------------------------------------------------------------------
def run_consistency_check(graph: Graph):
"""Force-run the consistency engine over ``graph``.
Returns the :class:`ConsistencyRun` summary. Stashes the
violation list on the runner for the inspection tools.
"""
runner = _get_runner()
return runner.run(graph)
def latest_run():
"""The most recent run summary, or ``None`` if no run yet."""
return _get_runner().latest_run()
# ---------------------------------------------------------------------------
# Tool 1 — get_contradictions
# ---------------------------------------------------------------------------
def get_contradictions(
subject: Optional[str] = None,
severity: Optional[str] = None,
limit: Optional[int] = None,
) -> list[Contradiction]:
"""List contradictions from the most recent run.
Filters: ``subject`` (exact match on
``Contradiction.subject``), ``severity`` (``"warn"`` or
``"error"``), ``limit`` (cap on result size).
"""
runner = _get_runner()
out: list[Contradiction] = [
v for v in runner.last_violations if isinstance(v, Contradiction)
]
if subject is not None:
out = [c for c in out if c.subject == subject]
if severity is not None:
out = [c for c in out if c.severity == severity]
if limit is not None:
out = out[:limit]
return out
# ---------------------------------------------------------------------------
# Tool 2 — get_anachronisms
# ---------------------------------------------------------------------------
def get_anachronisms(
entity: Optional[str] = None,
limit: Optional[int] = None,
include_flagged: bool = False,
) -> list[Anachronism]:
"""List anachronisms from the most recent run.
Filters: ``entity`` (matches ``Anachronism.entity_name``),
``limit``, ``include_flagged`` (default False — flagged
violations are hidden once acknowledged, AC 2.9).
"""
runner = _get_runner()
out: list[Anachronism] = [
v for v in runner.last_violations if isinstance(v, Anachronism)
]
if entity is not None:
out = [a for a in out if a.entity_name == entity]
if not include_flagged:
out = [a for a in out if not a.flagged]
if limit is not None:
out = out[:limit]
return out
# ---------------------------------------------------------------------------
# Tool 3 — get_orphans
# ---------------------------------------------------------------------------
def get_orphans(
reason: Optional[str] = None,
limit: Optional[int] = None,
) -> list[Orphan]:
"""List orphans from the most recent run.
Filters: ``reason`` (matches ``Orphan.reason``), ``limit``.
"""
runner = _get_runner()
out: list[Orphan] = [
v for v in runner.last_violations if isinstance(v, Orphan)
]
if reason is not None:
out = [o for o in out if o.reason == reason]
if limit is not None:
out = out[:limit]
return out
# ---------------------------------------------------------------------------
# Tool 4 — get_ontology_violations
# ---------------------------------------------------------------------------
def get_ontology_violations(
rule_id: Optional[str] = None,
severity: Optional[str] = None,
limit: Optional[int] = None,
) -> list[OntologyViolation]:
"""List ontology violations from the most recent run.
Filters: ``rule_id`` (matches ``OntologyViolation.rule_id``),
``severity``, ``limit``.
"""
runner = _get_runner()
out: list[OntologyViolation] = [
v for v in runner.last_violations if isinstance(v, OntologyViolation)
]
if rule_id is not None:
out = [v for v in out if v.rule_id == rule_id]
if severity is not None:
out = [v for v in out if v.severity == severity]
if limit is not None:
out = out[:limit]
return out
# ---------------------------------------------------------------------------
# Tool 5 — flag_for_review
# ---------------------------------------------------------------------------
def flag_for_review(node_id: str, reason: str) -> dict:
"""Mark a violation as flagged.
The world-builder's "acknowledge" mechanism. Sets
``flagged=True`` on the violation, suppressing future flagging
in slice 2.4's per-rule suppression logic. ``reason`` is the
audit string the world-builder supplies (the LLM does not set
this).
Returns a small dict confirming the action. Raises
:class:`KeyError` for unknown ``node_id`` (we don't silently
no-op).
"""
runner = _get_runner()
for v in runner.last_violations:
if v.id == node_id:
v.flagged = True
return {"flagged": True, "node_id": node_id, "reason": reason}
raise KeyError(f"no violation with id {node_id!r} in the most recent run")
# ---------------------------------------------------------------------------
# Tool 6 — explain_violation
# ---------------------------------------------------------------------------
def explain_violation(node_id: str) -> dict:
"""Return the rule that produced a violation plus the offending edges.
The output shape::
{
"node_id": str,
"kind": "Contradiction" | "Anachronism" | "Orphan" | "OntologyViolation",
"subject": str,
"predicate": str,
"rule_id": str | None, # only for OntologyViolation
"description": str, # only for OntologyViolation
"severity": str,
"flagged": bool,
"sources": list[str],
"claim_a": str, # only for Contradiction
"claim_b": str, # only for Contradiction
"expected": str | None, # only for Anachronism
"actual": str | None, # only for Anachronism
"claim": list, # only for OntologyViolation
"reason": str, # only for Orphan
}
Raises :class:`KeyError` for unknown ``node_id``.
"""
runner = _get_runner()
for v in runner.last_violations:
if v.id != node_id:
continue
out: dict = {
"node_id": v.id,
"kind": type(v).__name__,
"subject": getattr(v, "subject", None) or getattr(v, "entity_name", ""),
"predicate": getattr(v, "predicate", "") or getattr(v, "event_name", ""),
"severity": v.severity,
"flagged": v.flagged,
"sources": list(getattr(v, "sources", ())),
}
if isinstance(v, Contradiction):
out["claim_a"] = v.claim_a
out["claim_b"] = v.claim_b
elif isinstance(v, Anachronism):
out["claim"] = v.claim
out["expected"] = v.expected
out["actual"] = v.actual
elif isinstance(v, OntologyViolation):
out["rule_id"] = v.rule_id
# Surface the rule's description if we can find it.
rule = RULE_REGISTRY.get(v.rule_id)
out["description"] = rule.description if rule else ""
out["claim"] = v.claim
elif isinstance(v, Orphan):
out["entity_type"] = v.entity_type
out["reason"] = v.reason
return out
raise KeyError(f"no violation with id {node_id!r} in the most recent run")
# ---------------------------------------------------------------------------
# Tool 9 — add_ontology_rule
# ---------------------------------------------------------------------------
def add_ontology_rule(
id: str,
cypher: str,
description: str,
severity: str,
fn=None,
) -> OntologyRule:
"""Register a new ontology rule.
``cypher`` is the rule's source string (kept for ``explain_violation``
output and audit). In the in-memory POC the actual rule body
is the Python ``fn`` callable; in slice 2.6+ the body will be
a Cypher query against Cognee. For now we accept both — if
``fn`` is provided, it's used; otherwise the rule emits no
violations (it registers the metadata but no body).
Raises :class:`ValueError` for duplicate ``id``.
"""
if id in RULE_REGISTRY:
raise ValueError(f"rule_id {id!r} is already registered")
rule = OntologyRule(
rule_id=id,
description=description,
severity=severity,
fn=fn if fn is not None else (lambda g: []),
)
RULE_REGISTRY[id] = rule
return rule
# ---------------------------------------------------------------------------
# Tool 10 — list_ontology_rules
# ---------------------------------------------------------------------------
def list_ontology_rules() -> list[OntologyRule]:
"""Return every registered ontology rule (the 10 starter rules
plus any user-added ones)."""
return list_rules()
__all__ = [
"add_ontology_rule",
"explain_violation",
"flag_for_review",
"get_anachronisms",
"get_contradictions",
"get_ontology_violations",
"get_orphans",
"latest_run",
"list_ontology_rules",
"run_consistency_check",
]