Files
lore-engine-poc/gateway/server.py
hermes 8e8503e8f9 T3: consistency plugin skeleton (4 violation tools, 4 Neo4j labels)
- plugins/consistency.py: find_contradictions, find_anachronisms, find_orphans,
  find_ontology_violations — all stub queries returning the
  {violations, count} envelope, ready for T5 to populate violation nodes.
  Two of the tools accept a severity filter (any|critical|major|minor).
- neo4j/init.cypher: uniqueness constraints on Contradiction, Anachronism,
  Orphan, OntologyViolation (id) + severity/status indexes on the
  contradiction/anachronism labels.
- README.md: bump plugin list to five, replace 'no consistency engine' copy
  with 'consistency engine is a stub', drop the two T3 bullet points from
  the next-steps section.

Verification: /healthz lists 18 tools (was 14), all 4 new tools return
{"violations": [], "count": 0}, full test.sh passes.
2026-06-16 14:21:48 +00:00

211 lines
7.4 KiB
Python

"""
Lore Engine POC — minimal MCP-compatible JSON-RPC gateway.
Plugin architecture: every .py file in plugins/ is imported at startup.
A plugin exposes a `register(registry)` function that calls
registry.tool(name, description, schema, handler) to add MCP tools.
The gateway serves tools/list and tools/call.
This is the proof: adding a new tool is a new file in plugins/, not
a change to server.py. The plugin boundary is data-driven.
"""
import importlib
import importlib.util
import logging
import os
import sys
from pathlib import Path
from typing import Any, Callable
# Make the gateway package importable by plugins regardless of CWD.
sys.path.insert(0, str(Path(__file__).resolve().parent))
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
# ─── Plugin registry ────────────────────────────────────────────────────────
class ToolDef(BaseModel):
name: str
description: str
input_schema: dict
handler: Callable # not serialized; in-process only
class ToolRegistry:
def __init__(self):
self._tools: dict[str, ToolDef] = {}
def tool(self, name: str, description: str, input_schema: dict):
"""Decorator: register a handler as an MCP tool."""
def deco(fn: Callable) -> Callable:
self._tools[name] = ToolDef(
name=name, description=description,
input_schema=input_schema, handler=fn
)
return fn
return deco
def list(self) -> list[dict]:
return [
{"name": t.name, "description": t.description, "inputSchema": t.input_schema}
for t in self._tools.values()
]
def call(self, name: str, arguments: dict) -> Any:
if name not in self._tools:
raise KeyError(f"unknown tool: {name}")
return self._tools[name].handler(arguments)
REGISTRY = ToolRegistry()
# ─── Store connections (lazy, shared across plugins) ─────────────────────────
def get_neo4j():
from neo4j import GraphDatabase
return GraphDatabase.driver(
os.environ["NEO4J_URL"],
auth=(os.environ["NEO4J_USER"], os.environ["NEO4J_PASSWORD"])
)
def get_postgres():
import psycopg2
return psycopg2.connect(os.environ["POSTGRES_URL"])
def get_minio():
from minio import Minio
return Minio(
os.environ["MINIO_URL"].replace("http://", ""),
access_key=os.environ["MINIO_ACCESS_KEY"],
secret_key=os.environ["MINIO_SECRET_KEY"],
secure=False,
)
# ─── Plugin discovery ────────────────────────────────────────────────────────
def load_plugins(plugins_dir: str):
"""Import every .py file in plugins/ — each one calls register(REGISTRY)."""
p = Path(plugins_dir)
if not p.exists():
return []
loaded = []
# Make `from server import REGISTRY, get_neo4j, get_postgres, get_minio`
# work inside plugin files even though they're loaded via spec_from_file_location.
import sys as _sys
_sys.modules["server"] = _sys.modules[__name__]
for f in sorted(p.glob("*.py")):
if f.name.startswith("_"):
continue
spec = importlib.util.spec_from_file_location(f"plugin_{f.stem}", f)
if spec is None or spec.loader is None:
logging.warning(f"could not load plugin {f}")
continue
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
if hasattr(mod, "register"):
mod.register(REGISTRY)
loaded.append(f.stem)
return loaded
# ─── HTTP / JSON-RPC server ──────────────────────────────────────────────────
app = FastAPI(title="Lore Engine POC", version="0.1.0")
@app.on_event("startup")
def _startup():
plugins_dir = os.environ.get("PLUGINS_DIR", "/app/plugins")
loaded = load_plugins(plugins_dir)
logging.info(f"Loaded {len(loaded)} plugins: {loaded}")
logging.info(f"Registered {len(REGISTRY.list())} tools")
# Run any pre-shipped schema init scripts (idempotent CREATE CONSTRAINT/INDEX).
init_cypher = os.environ.get("INIT_CYPHER")
if init_cypher and Path(init_cypher).exists():
try:
from neo4j import GraphDatabase
d = GraphDatabase.driver(
os.environ["NEO4J_URL"],
auth=(os.environ["NEO4J_USER"], os.environ["NEO4J_PASSWORD"])
)
with d.session() as s, open(init_cypher) as f:
cypher = f.read()
# Strip line comments (// ...) before splitting on ';'
cleaned_lines = []
for line in cypher.splitlines():
stripped = line.strip()
if stripped.startswith("//"):
continue
cleaned_lines.append(line)
cleaned = "\n".join(cleaned_lines)
with d.session() as s:
for stmt in [s.strip() for s in cleaned.split(";") if s.strip()]:
s.run(stmt)
d.close()
logging.info(f"applied init.cypher from {init_cypher}")
except Exception as e:
logging.warning(f"init.cypher load failed: {e}")
@app.get("/healthz")
def healthz():
return {
"status": "ok",
"plugins": [t["name"] for t in REGISTRY.list()],
}
@app.post("/mcp")
async def mcp(request: Request):
"""Minimal MCP JSON-RPC: tools/list and tools/call."""
body = await request.json()
method = body.get("method")
req_id = body.get("id")
params = body.get("params", {})
if method == "tools/list":
return JSONResponse({
"jsonrpc": "2.0", "id": req_id,
"result": {"tools": REGISTRY.list()}
})
if method == "tools/call":
name = params.get("name")
args = params.get("arguments", {})
try:
result = REGISTRY.call(name, args)
return JSONResponse({
"jsonrpc": "2.0", "id": req_id,
"result": {
"content": [{"type": "text", "text": _jsonify(result)}],
"isError": False,
}
})
except Exception as e:
logging.exception("tool call failed")
return JSONResponse({
"jsonrpc": "2.0", "id": req_id,
"result": {
"content": [{"type": "text", "text": f"error: {e}"}],
"isError": True,
}
})
return JSONResponse({
"jsonrpc": "2.0", "id": req_id,
"error": {"code": -32601, "message": f"unknown method: {method}"}
}, status_code=400)
def _jsonify(obj):
"""JSON serializer that handles datetime, sets, and Neo4j DateTime."""
import json
from datetime import datetime, date
def default(o):
if isinstance(o, (datetime, date)):
return o.isoformat()
if isinstance(o, set):
return list(o)
return str(o)
return json.dumps(obj, default=default, indent=2)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8765, log_level="info")