- New lore_engine_poc/consistency_config.py: ConsistencyConfig dataclass with disable_rules[], severity (default 'warn' per AC 2.8), confidence_threshold (per-rule floor), acknowledged set (AC 2.9). is_disabled(rule_id), is_acknowledged(id), acknowledge(id) helpers. - ConsistencyRunner.run() now accepts an optional config parameter; applies severity override, skips disabled rules, suppresses below threshold, suppresses acknowledged violations. - Anachronism dataclass now carries source_confidences (parallel to sources) so confidence_threshold can suppress low-confidence findings. Default = 1.0 when not set. - get_anachronisms() got an include_flagged param (default False); flagged violations are hidden by default. - 9/9 new tests; full suite 245/245 (was 236). Co-Authored-By: Claude <noreply@anthropic.com>
339 lines
11 KiB
Python
339 lines
11 KiB
Python
"""Lore Engine POC — 10 consistency MCP tools (slice 2.3).
|
|
|
|
The 10 tools are the public API the LLM uses to ask about
|
|
consistency. The POC exposes them as plain Python functions; the
|
|
MCP server (``slice 2.6+``) wraps each one as a JSON-RPC handler.
|
|
|
|
The tools operate on a singleton :class:`ConsistencyRunner` plus
|
|
the violation list it produced on its most recent run. This is
|
|
deliberately stateful — the LLM's typical flow is:
|
|
|
|
1. ``run_consistency_check(graph)`` to populate the runner's
|
|
state.
|
|
2. ``get_contradictions()`` / ``get_anachronisms()`` / etc.
|
|
to inspect the violations.
|
|
3. ``flag_for_review(id, reason)`` to mark a warning as
|
|
acknowledged.
|
|
4. ``explain_violation(id)`` to surface the rule that produced
|
|
a violation.
|
|
|
|
The singleton is reset between tests via :func:`_reset_runner`,
|
|
exposed as a private hook. In production the singleton is replaced
|
|
by a per-session runner (slice 2.6 wires the MCP server's
|
|
session store).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Optional
|
|
|
|
from .consistency import (
|
|
Anachronism,
|
|
Contradiction,
|
|
OntologyViolation,
|
|
Orphan,
|
|
)
|
|
from .consistency_runner import ConsistencyRunner
|
|
from .ontology_rules import RULE_REGISTRY, OntologyRule, list_rules
|
|
from .tools import Graph
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Module-level singleton — one ConsistencyRunner per process
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_runner: Optional[ConsistencyRunner] = None
|
|
|
|
|
|
def _get_runner() -> ConsistencyRunner:
|
|
"""Lazily create the singleton runner."""
|
|
global _runner
|
|
if _runner is None:
|
|
_runner = ConsistencyRunner()
|
|
return _runner
|
|
|
|
|
|
def _reset_runner() -> None:
|
|
"""Drop the singleton — used by tests to get a clean state."""
|
|
global _runner
|
|
_runner = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool 7 — run_consistency_check + Tool 8 — latest_run
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def run_consistency_check(graph: Graph):
|
|
"""Force-run the consistency engine over ``graph``.
|
|
|
|
Returns the :class:`ConsistencyRun` summary. Stashes the
|
|
violation list on the runner for the inspection tools.
|
|
"""
|
|
runner = _get_runner()
|
|
return runner.run(graph)
|
|
|
|
|
|
def latest_run():
|
|
"""The most recent run summary, or ``None`` if no run yet."""
|
|
return _get_runner().latest_run()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool 1 — get_contradictions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def get_contradictions(
|
|
subject: Optional[str] = None,
|
|
severity: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
) -> list[Contradiction]:
|
|
"""List contradictions from the most recent run.
|
|
|
|
Filters: ``subject`` (exact match on
|
|
``Contradiction.subject``), ``severity`` (``"warn"`` or
|
|
``"error"``), ``limit`` (cap on result size).
|
|
"""
|
|
runner = _get_runner()
|
|
out: list[Contradiction] = [
|
|
v for v in runner.last_violations if isinstance(v, Contradiction)
|
|
]
|
|
if subject is not None:
|
|
out = [c for c in out if c.subject == subject]
|
|
if severity is not None:
|
|
out = [c for c in out if c.severity == severity]
|
|
if limit is not None:
|
|
out = out[:limit]
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool 2 — get_anachronisms
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def get_anachronisms(
|
|
entity: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
include_flagged: bool = False,
|
|
) -> list[Anachronism]:
|
|
"""List anachronisms from the most recent run.
|
|
|
|
Filters: ``entity`` (matches ``Anachronism.entity_name``),
|
|
``limit``, ``include_flagged`` (default False — flagged
|
|
violations are hidden once acknowledged, AC 2.9).
|
|
"""
|
|
runner = _get_runner()
|
|
out: list[Anachronism] = [
|
|
v for v in runner.last_violations if isinstance(v, Anachronism)
|
|
]
|
|
if entity is not None:
|
|
out = [a for a in out if a.entity_name == entity]
|
|
if not include_flagged:
|
|
out = [a for a in out if not a.flagged]
|
|
if limit is not None:
|
|
out = out[:limit]
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool 3 — get_orphans
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def get_orphans(
|
|
reason: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
) -> list[Orphan]:
|
|
"""List orphans from the most recent run.
|
|
|
|
Filters: ``reason`` (matches ``Orphan.reason``), ``limit``.
|
|
"""
|
|
runner = _get_runner()
|
|
out: list[Orphan] = [
|
|
v for v in runner.last_violations if isinstance(v, Orphan)
|
|
]
|
|
if reason is not None:
|
|
out = [o for o in out if o.reason == reason]
|
|
if limit is not None:
|
|
out = out[:limit]
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool 4 — get_ontology_violations
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def get_ontology_violations(
|
|
rule_id: Optional[str] = None,
|
|
severity: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
) -> list[OntologyViolation]:
|
|
"""List ontology violations from the most recent run.
|
|
|
|
Filters: ``rule_id`` (matches ``OntologyViolation.rule_id``),
|
|
``severity``, ``limit``.
|
|
"""
|
|
runner = _get_runner()
|
|
out: list[OntologyViolation] = [
|
|
v for v in runner.last_violations if isinstance(v, OntologyViolation)
|
|
]
|
|
if rule_id is not None:
|
|
out = [v for v in out if v.rule_id == rule_id]
|
|
if severity is not None:
|
|
out = [v for v in out if v.severity == severity]
|
|
if limit is not None:
|
|
out = out[:limit]
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool 5 — flag_for_review
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def flag_for_review(node_id: str, reason: str) -> dict:
|
|
"""Mark a violation as flagged.
|
|
|
|
The world-builder's "acknowledge" mechanism. Sets
|
|
``flagged=True`` on the violation, suppressing future flagging
|
|
in slice 2.4's per-rule suppression logic. ``reason`` is the
|
|
audit string the world-builder supplies (the LLM does not set
|
|
this).
|
|
|
|
Returns a small dict confirming the action. Raises
|
|
:class:`KeyError` for unknown ``node_id`` (we don't silently
|
|
no-op).
|
|
"""
|
|
runner = _get_runner()
|
|
for v in runner.last_violations:
|
|
if v.id == node_id:
|
|
v.flagged = True
|
|
return {"flagged": True, "node_id": node_id, "reason": reason}
|
|
raise KeyError(f"no violation with id {node_id!r} in the most recent run")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool 6 — explain_violation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def explain_violation(node_id: str) -> dict:
|
|
"""Return the rule that produced a violation plus the offending edges.
|
|
|
|
The output shape::
|
|
|
|
{
|
|
"node_id": str,
|
|
"kind": "Contradiction" | "Anachronism" | "Orphan" | "OntologyViolation",
|
|
"subject": str,
|
|
"predicate": str,
|
|
"rule_id": str | None, # only for OntologyViolation
|
|
"description": str, # only for OntologyViolation
|
|
"severity": str,
|
|
"flagged": bool,
|
|
"sources": list[str],
|
|
"claim_a": str, # only for Contradiction
|
|
"claim_b": str, # only for Contradiction
|
|
"expected": str | None, # only for Anachronism
|
|
"actual": str | None, # only for Anachronism
|
|
"claim": list, # only for OntologyViolation
|
|
"reason": str, # only for Orphan
|
|
}
|
|
|
|
Raises :class:`KeyError` for unknown ``node_id``.
|
|
"""
|
|
runner = _get_runner()
|
|
for v in runner.last_violations:
|
|
if v.id != node_id:
|
|
continue
|
|
out: dict = {
|
|
"node_id": v.id,
|
|
"kind": type(v).__name__,
|
|
"subject": getattr(v, "subject", None) or getattr(v, "entity_name", ""),
|
|
"predicate": getattr(v, "predicate", "") or getattr(v, "event_name", ""),
|
|
"severity": v.severity,
|
|
"flagged": v.flagged,
|
|
"sources": list(getattr(v, "sources", ())),
|
|
}
|
|
if isinstance(v, Contradiction):
|
|
out["claim_a"] = v.claim_a
|
|
out["claim_b"] = v.claim_b
|
|
elif isinstance(v, Anachronism):
|
|
out["claim"] = v.claim
|
|
out["expected"] = v.expected
|
|
out["actual"] = v.actual
|
|
elif isinstance(v, OntologyViolation):
|
|
out["rule_id"] = v.rule_id
|
|
# Surface the rule's description if we can find it.
|
|
rule = RULE_REGISTRY.get(v.rule_id)
|
|
out["description"] = rule.description if rule else ""
|
|
out["claim"] = v.claim
|
|
elif isinstance(v, Orphan):
|
|
out["entity_type"] = v.entity_type
|
|
out["reason"] = v.reason
|
|
return out
|
|
raise KeyError(f"no violation with id {node_id!r} in the most recent run")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool 9 — add_ontology_rule
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def add_ontology_rule(
|
|
id: str,
|
|
cypher: str,
|
|
description: str,
|
|
severity: str,
|
|
fn=None,
|
|
) -> OntologyRule:
|
|
"""Register a new ontology rule.
|
|
|
|
``cypher`` is the rule's source string (kept for ``explain_violation``
|
|
output and audit). In the in-memory POC the actual rule body
|
|
is the Python ``fn`` callable; in slice 2.6+ the body will be
|
|
a Cypher query against Cognee. For now we accept both — if
|
|
``fn`` is provided, it's used; otherwise the rule emits no
|
|
violations (it registers the metadata but no body).
|
|
|
|
Raises :class:`ValueError` for duplicate ``id``.
|
|
"""
|
|
if id in RULE_REGISTRY:
|
|
raise ValueError(f"rule_id {id!r} is already registered")
|
|
rule = OntologyRule(
|
|
rule_id=id,
|
|
description=description,
|
|
severity=severity,
|
|
fn=fn if fn is not None else (lambda g: []),
|
|
)
|
|
RULE_REGISTRY[id] = rule
|
|
return rule
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool 10 — list_ontology_rules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def list_ontology_rules() -> list[OntologyRule]:
|
|
"""Return every registered ontology rule (the 10 starter rules
|
|
plus any user-added ones)."""
|
|
return list_rules()
|
|
|
|
|
|
__all__ = [
|
|
"add_ontology_rule",
|
|
"explain_violation",
|
|
"flag_for_review",
|
|
"get_anachronisms",
|
|
"get_contradictions",
|
|
"get_ontology_violations",
|
|
"get_orphans",
|
|
"latest_run",
|
|
"list_ontology_rules",
|
|
"run_consistency_check",
|
|
] |