- plugins/consistency.py: find_contradictions, find_anachronisms, find_orphans,
find_ontology_violations — all stub queries returning the
{violations, count} envelope, ready for T5 to populate violation nodes.
Two of the tools accept a severity filter (any|critical|major|minor).
- neo4j/init.cypher: uniqueness constraints on Contradiction, Anachronism,
Orphan, OntologyViolation (id) + severity/status indexes on the
contradiction/anachronism labels.
- README.md: bump plugin list to five, replace 'no consistency engine' copy
with 'consistency engine is a stub', drop the two T3 bullet points from
the next-steps section.
Verification: /healthz lists 18 tools (was 14), all 4 new tools return
{"violations": [], "count": 0}, full test.sh passes.
211 lines
7.4 KiB
Python
211 lines
7.4 KiB
Python
"""
|
|
Lore Engine POC — minimal MCP-compatible JSON-RPC gateway.
|
|
|
|
Plugin architecture: every .py file in plugins/ is imported at startup.
|
|
A plugin exposes a `register(registry)` function that calls
|
|
registry.tool(name, description, schema, handler) to add MCP tools.
|
|
The gateway serves tools/list and tools/call.
|
|
|
|
This is the proof: adding a new tool is a new file in plugins/, not
|
|
a change to server.py. The plugin boundary is data-driven.
|
|
"""
|
|
import importlib
|
|
import importlib.util
|
|
import logging
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any, Callable
|
|
|
|
# Make the gateway package importable by plugins regardless of CWD.
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
|
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.responses import JSONResponse
|
|
from pydantic import BaseModel
|
|
|
|
# ─── Plugin registry ────────────────────────────────────────────────────────
|
|
|
|
class ToolDef(BaseModel):
|
|
name: str
|
|
description: str
|
|
input_schema: dict
|
|
handler: Callable # not serialized; in-process only
|
|
|
|
class ToolRegistry:
|
|
def __init__(self):
|
|
self._tools: dict[str, ToolDef] = {}
|
|
|
|
def tool(self, name: str, description: str, input_schema: dict):
|
|
"""Decorator: register a handler as an MCP tool."""
|
|
def deco(fn: Callable) -> Callable:
|
|
self._tools[name] = ToolDef(
|
|
name=name, description=description,
|
|
input_schema=input_schema, handler=fn
|
|
)
|
|
return fn
|
|
return deco
|
|
|
|
def list(self) -> list[dict]:
|
|
return [
|
|
{"name": t.name, "description": t.description, "inputSchema": t.input_schema}
|
|
for t in self._tools.values()
|
|
]
|
|
|
|
def call(self, name: str, arguments: dict) -> Any:
|
|
if name not in self._tools:
|
|
raise KeyError(f"unknown tool: {name}")
|
|
return self._tools[name].handler(arguments)
|
|
|
|
REGISTRY = ToolRegistry()
|
|
|
|
# ─── Store connections (lazy, shared across plugins) ─────────────────────────
|
|
|
|
def get_neo4j():
|
|
from neo4j import GraphDatabase
|
|
return GraphDatabase.driver(
|
|
os.environ["NEO4J_URL"],
|
|
auth=(os.environ["NEO4J_USER"], os.environ["NEO4J_PASSWORD"])
|
|
)
|
|
|
|
def get_postgres():
|
|
import psycopg2
|
|
return psycopg2.connect(os.environ["POSTGRES_URL"])
|
|
|
|
def get_minio():
|
|
from minio import Minio
|
|
return Minio(
|
|
os.environ["MINIO_URL"].replace("http://", ""),
|
|
access_key=os.environ["MINIO_ACCESS_KEY"],
|
|
secret_key=os.environ["MINIO_SECRET_KEY"],
|
|
secure=False,
|
|
)
|
|
|
|
# ─── Plugin discovery ────────────────────────────────────────────────────────
|
|
|
|
def load_plugins(plugins_dir: str):
|
|
"""Import every .py file in plugins/ — each one calls register(REGISTRY)."""
|
|
p = Path(plugins_dir)
|
|
if not p.exists():
|
|
return []
|
|
loaded = []
|
|
# Make `from server import REGISTRY, get_neo4j, get_postgres, get_minio`
|
|
# work inside plugin files even though they're loaded via spec_from_file_location.
|
|
import sys as _sys
|
|
_sys.modules["server"] = _sys.modules[__name__]
|
|
for f in sorted(p.glob("*.py")):
|
|
if f.name.startswith("_"):
|
|
continue
|
|
spec = importlib.util.spec_from_file_location(f"plugin_{f.stem}", f)
|
|
if spec is None or spec.loader is None:
|
|
logging.warning(f"could not load plugin {f}")
|
|
continue
|
|
mod = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(mod)
|
|
if hasattr(mod, "register"):
|
|
mod.register(REGISTRY)
|
|
loaded.append(f.stem)
|
|
return loaded
|
|
|
|
# ─── HTTP / JSON-RPC server ──────────────────────────────────────────────────
|
|
|
|
app = FastAPI(title="Lore Engine POC", version="0.1.0")
|
|
|
|
@app.on_event("startup")
|
|
def _startup():
|
|
plugins_dir = os.environ.get("PLUGINS_DIR", "/app/plugins")
|
|
loaded = load_plugins(plugins_dir)
|
|
logging.info(f"Loaded {len(loaded)} plugins: {loaded}")
|
|
logging.info(f"Registered {len(REGISTRY.list())} tools")
|
|
# Run any pre-shipped schema init scripts (idempotent CREATE CONSTRAINT/INDEX).
|
|
init_cypher = os.environ.get("INIT_CYPHER")
|
|
if init_cypher and Path(init_cypher).exists():
|
|
try:
|
|
from neo4j import GraphDatabase
|
|
d = GraphDatabase.driver(
|
|
os.environ["NEO4J_URL"],
|
|
auth=(os.environ["NEO4J_USER"], os.environ["NEO4J_PASSWORD"])
|
|
)
|
|
with d.session() as s, open(init_cypher) as f:
|
|
cypher = f.read()
|
|
# Strip line comments (// ...) before splitting on ';'
|
|
cleaned_lines = []
|
|
for line in cypher.splitlines():
|
|
stripped = line.strip()
|
|
if stripped.startswith("//"):
|
|
continue
|
|
cleaned_lines.append(line)
|
|
cleaned = "\n".join(cleaned_lines)
|
|
with d.session() as s:
|
|
for stmt in [s.strip() for s in cleaned.split(";") if s.strip()]:
|
|
s.run(stmt)
|
|
d.close()
|
|
logging.info(f"applied init.cypher from {init_cypher}")
|
|
except Exception as e:
|
|
logging.warning(f"init.cypher load failed: {e}")
|
|
|
|
@app.get("/healthz")
|
|
def healthz():
|
|
return {
|
|
"status": "ok",
|
|
"plugins": [t["name"] for t in REGISTRY.list()],
|
|
}
|
|
|
|
@app.post("/mcp")
|
|
async def mcp(request: Request):
|
|
"""Minimal MCP JSON-RPC: tools/list and tools/call."""
|
|
body = await request.json()
|
|
method = body.get("method")
|
|
req_id = body.get("id")
|
|
params = body.get("params", {})
|
|
|
|
if method == "tools/list":
|
|
return JSONResponse({
|
|
"jsonrpc": "2.0", "id": req_id,
|
|
"result": {"tools": REGISTRY.list()}
|
|
})
|
|
|
|
if method == "tools/call":
|
|
name = params.get("name")
|
|
args = params.get("arguments", {})
|
|
try:
|
|
result = REGISTRY.call(name, args)
|
|
return JSONResponse({
|
|
"jsonrpc": "2.0", "id": req_id,
|
|
"result": {
|
|
"content": [{"type": "text", "text": _jsonify(result)}],
|
|
"isError": False,
|
|
}
|
|
})
|
|
except Exception as e:
|
|
logging.exception("tool call failed")
|
|
return JSONResponse({
|
|
"jsonrpc": "2.0", "id": req_id,
|
|
"result": {
|
|
"content": [{"type": "text", "text": f"error: {e}"}],
|
|
"isError": True,
|
|
}
|
|
})
|
|
|
|
return JSONResponse({
|
|
"jsonrpc": "2.0", "id": req_id,
|
|
"error": {"code": -32601, "message": f"unknown method: {method}"}
|
|
}, status_code=400)
|
|
|
|
def _jsonify(obj):
|
|
"""JSON serializer that handles datetime, sets, and Neo4j DateTime."""
|
|
import json
|
|
from datetime import datetime, date
|
|
def default(o):
|
|
if isinstance(o, (datetime, date)):
|
|
return o.isoformat()
|
|
if isinstance(o, set):
|
|
return list(o)
|
|
return str(o)
|
|
return json.dumps(obj, default=default, indent=2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8765, log_level="info")
|