Merge branch 'wt/t1-gitea-push' into main

T1 — push v1 + open v2 milestone board (fab3321)
T3 — consistency plugin skeleton (8e8503e): 4 violation tools (find_contradictions, find_anachronisms, find_orphans, find_ontology_violations) + 4 Neo4j uniqueness constraints + severity/status indexes

Adds docs/SMOKE.md, scripts/ci-smoke.sh, .gitignore, .env.seed
This commit is contained in:
Kay
2026-06-17 00:33:08 +00:00
19 changed files with 2163 additions and 1 deletions

8
.env.seed Normal file
View File

@@ -0,0 +1,8 @@
POSTGRES_URL=postgresql://lore:***@postgres:5432/lore
NEO4J_URL=bolt://neo4j:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=***
MINIO_URL=http://minio:9000
MINIO_ACCESS_KEY=lorelore
MINIO_SECRET_KEY=***
MINIO_BUCKET=lore-images

13
.gitignore vendored Normal file
View File

@@ -0,0 +1,13 @@
# Python
__pycache__/
*.py[cod]
*$py.class
# Local smoke-runner logs
.smoke-*.log
# Local editor / OS noise
.DS_Store
*.swp
.idea/
.vscode/

169
README.md
View File

@@ -1,3 +1,170 @@
# lore-engine-poc
Proof of concept: Neo4j + Postgres + MinIO + Python plugin gateway for the Lore Engine. Validates the v1.1 plugin architecture and image recall.
**Proof of concept for the Lore Engine v1.1 architecture.**
Five-minute goal: prove that with mock data, we can run a multi-database backend (Neo4j for the world graph, Postgres for operational records, MinIO for blob/image storage) and expose it all through a **plugin-driven MCP gateway** — where adding a new domain type is a new file in `plugins/`, not a Go change.
## What's running
| Container | Image | Port | Role |
|---|---|---|---|
| `lore-neo4j` | `neo4j:5.26-community` | 7474 (browser), 7687 (bolt) | The world graph: people, factions, eras, events, lineage, time-bounded relations |
| `lore-postgres` | `postgres:16-alpine` | 5432 | Trade log, image manifests, audit |
| `lore-minio` | `minio/minio:latest` | 9000 (S3), 9001 (console) | Image blob storage |
| `lore-gateway` | built locally | 8765 (MCP JSON-RPC) | The plugin-driven gateway |
## The five plugins (this is the proof)
```
plugins/
├── world.py # entity_context, was_true_at, state_at (Neo4j)
├── lineage.py # ancestors_of, descendants_of, lineage_of (Neo4j)
├── trade.py # log_trade, trades_by_buyer, market_price (Postgres)
├── images.py # register_image, recall_images, search_images_by_caption
# (MinIO + Postgres + Neo4j)
└── consistency.py # find_contradictions, find_anachronisms,
# find_orphans, find_ontology_violations (Neo4j — stub)
```
Each plugin is a single file with a `register(registry)` entry point. The gateway auto-loads every `.py` file in `plugins/` at startup. **No server.py change needed to add a new tool** — drop a new file in, restart the container, the new tools appear in `tools/list`.
## How to run it
```bash
cd /root/lore-engine-poc
docker compose up -d --build
# wait ~30s for neo4j + postgres + minio to be ready
docker exec -i lore-neo4j cypher-shell -u neo4j -p lore-dev-password < neo4j/init.cypher
docker compose exec -T postgres psql -U lore -d lore < postgres/init.sql
python3 seed.py
# gateway is now live on :8765
```
The `seed.py` script is idempotent (uses `MERGE` and `ON CONFLICT`). It loads:
- 3 eras (1st Age, 2nd Age, Age of Iron)
- 10 people (Theron, Maric, Aldric, Elara, Cael, Yssa, Vex, Alessia, Kael, Guildmaster Torren)
- 3 factions (House Vyr, The Crimson Pact, Merchants Guild)
- 4 locations (Valdorn, Mardsville, Thornwall Keep, Black Spire Pass)
- 4 items (Sword of Eventide, The Pale Ledger, Ruby Eye of Kael, Elara's Locket)
- 6 events
- 1 lineage group
- ~20 time-bounded relations
- 3 trade log entries
- 4 generated images (portraits + landscape + battle scene) uploaded to MinIO
## Try the gateway
### List all tools
```bash
curl -s -X POST http://localhost:8765/mcp \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tools/list"}' | python3 -m json.tool
```
### Look up Aldric
```bash
curl -s -X POST http://localhost:8765/mcp \
-H "Content-Type: application/json" \
-d '{
"jsonrpc":"2.0","id":1,"method":"tools/call",
"params":{"name":"entity_context","arguments":{"name":"Aldric Raventhorne"}}
}' | python3 -m json.tool
```
### Time-bounded query: was House Vyr allied with the Merchants Guild in 230 TA?
```bash
curl -s -X POST http://localhost:8765/mcp \
-H "Content-Type: application/json" \
-d '{
"jsonrpc":"2.0","id":1,"method":"tools/call",
"params":{
"name":"was_true_at",
"arguments":{
"relation":"ALLIED_WITH",
"subject":"House Vyr",
"object":"Merchants Guild",
"at_time":"2nd_age.year_230"
}
}
}' | python3 -m json.tool
```
### Lineage: Aldric's ancestors
```bash
curl -s -X POST http://localhost:8765/mcp \
-H "Content-Type: application/json" \
-d '{
"jsonrpc":"2.0","id":1,"method":"tools/call",
"params":{"name":"ancestors_of","arguments":{"person":"Aldric Raventhorne","generations":5}}
}' | python3 -m json.tool
```
### Image recall: show me pictures of Aldric
```bash
curl -s -X POST http://localhost:8765/mcp \
-H "Content-Type: application/json" \
-d '{
"jsonrpc":"2.0","id":1,"method":"tools/call",
"params":{"name":"recall_images","arguments":{"entity_id":"aldric"}}
}' | python3 -m json.tool
```
The response includes a `presigned_url` — a MinIO URL valid for 1 hour. The LLM (or the calling client) can fetch the actual PNG from there.
### Search images by caption
```bash
curl -s -X POST http://localhost:8765/mcp \
-H "Content-Type: application/json" \
-d '{
"jsonrpc":"2.0","id":1,"method":"tools/call",
"params":{"name":"search_images_by_caption","arguments":{"q":"aldric"}}
}' | python3 -m json.tool
```
### Market price for the Pale Ledger
```bash
curl -s -X POST http://localhost:8765/mcp \
-H "Content-Type: application/json" \
-d '{
"jsonrpc":"2.0","id":1,"method":"tools/call",
"params":{"name":"market_price","arguments":{"item_id":"pale_ledger"}}
}' | python3 -m json.tool
```
## What this proves
1. **The plugin boundary works.** A new domain type (trade, images) is a new file in `plugins/`. No change to `server.py`, no change to docker-compose, no new container. Restart the gateway and the new tools are live.
2. **Polyglot storage is real, not aspirational.** Neo4j holds the typed world graph. Postgres holds the time-series operational data and image manifests. MinIO holds the image bytes. Each store does what it's good at; the gateway composes the answers.
3. **Time is a first-class query primitive.** `was_true_at` checks time-bounded edges with a single Cypher query — no LLM, no inference. Year-level precision works against the mock data (see `2nd_age.year_230` example above).
4. **Image recall works.** Images are stored in MinIO, linked to entities in Neo4j (`(:Image)-[:DEPICTS]->(:Person)`), and discoverable by entity id, by tag, or by caption substring search. Presigned URLs are generated on the fly.
5. **The world is small but real.** 10 people, 6 events, 4 images, ~20 relations — enough to demonstrate the architecture end-to-end. Scaling is a separate problem; this is the proof of shape.
## What's not in this POC
- **No LLM in the loop.** The MCP gateway is a tool server; the LLM client (Claude, GPT, anything) is the consumer. This is intentional — the POC validates the data and tool layers, not the LLM reasoning. The reasoning harness is in the design docs (`lore-engine/docs/07-reasoning-harness.md`) and would be added as a system prompt in a real deployment.
- **Consistency engine is a stub.** The 4 violation types and their query tools are wired through `plugins/consistency.py` and the Neo4j constraints exist, but no detection logic runs yet — the violation nodes are written by a runner service that lands in a later phase.
- **No world-builder UI.** Everything is `curl` and `cypher-shell`. The UI is a v2 feature.
- **No reflective memory or behavior layer.** The Stanford Generative Agents pattern (memory stream + reflection + planning) is a v2 borrow per the comparison in `lore-engine/docs/16-comparison.md`.
## Next steps after this POC
- Fill in the consistency engine runner — write Contradiction / Anachronism / Orphan / OntologyViolation nodes based on the detection rules in `lore-engine/docs/04-consistency.md`. The plugin surface and constraints are already in place.
- Add the embedding-based semantic search plugin (uses the `Image.caption` and any future `Person.summary` text).
- Add an LLM client that consumes the gateway with the reasoning harness system prompt and runs the 5 question types from the design.
The v1 design in `lore-engine/docs/` is the contract. This POC is the proof of shape.

99
docker-compose.yml Normal file
View File

@@ -0,0 +1,99 @@
name: lore-engine-poc
# Lore Engine POC: Neo4j + Postgres + MinIO + Python plugin gateway
# Validates the v1.1 plugin architecture and image recall.
services:
# ─── Neo4j — world graph ────────────────────────────────────────────────────
neo4j:
image: neo4j:5.26-community
container_name: lore-neo4j
environment:
NEO4J_AUTH: neo4j/lore-dev-password
NEO4J_PLUGINS: '["apoc"]'
NEO4J_apoc_export_file_enabled: "true"
NEO4J_apoc_import_file_enabled: "true"
NEO4J_server_memory_heap_initial__size: 512m
NEO4J_server_memory_heap_max__size: 1g
ports:
- "7474:7474" # browser
- "7687:7687" # bolt
volumes:
- neo4j-data:/data
healthcheck:
test: ["CMD-SHELL", "wget -qO- http://localhost:7474 || exit 1"]
interval: 5s
timeout: 5s
retries: 20
# ─── Postgres — operational data + embeddings ──────────────────────────────
postgres:
image: postgres:16-alpine
container_name: lore-postgres
environment:
POSTGRES_USER: lore
POSTGRES_PASSWORD: lore-dev-password
POSTGRES_DB: lore
ports:
- "5432:5432"
volumes:
- postgres-data:/var/lib/postgresql/data
- ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql:ro
healthcheck:
test: ["CMD-SHELL", "pg_isready -U lore -d lore"]
interval: 5s
timeout: 5s
retries: 20
# ─── MinIO — blob storage for images ────────────────────────────────────────
minio:
image: minio/minio:latest
container_name: lore-minio
environment:
MINIO_ROOT_USER: lorelore
MINIO_ROOT_PASSWORD: lore-dev-password
command: server /data --console-address ":9001"
ports:
- "9000:9000" # S3 API
- "9001:9001" # console
volumes:
- minio-data:/data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/ready"]
interval: 5s
timeout: 5s
retries: 20
# ─── Lore Gateway — Python MCP server, plugin-driven ───────────────────────
gateway:
build:
context: ./gateway
container_name: lore-gateway
depends_on:
neo4j: { condition: service_healthy }
postgres: { condition: service_healthy }
minio: { condition: service_healthy }
environment:
NEO4J_URL: bolt://neo4j:7687
NEO4J_USER: neo4j
NEO4J_PASSWORD: lore-dev-password
POSTGRES_URL: postgresql://lore:lore-dev-password@postgres:5432/lore
MINIO_URL: http://minio:9000
MINIO_ACCESS_KEY: lorelore
MINIO_SECRET_KEY: lore-dev-password
MINIO_BUCKET: lore-images
MINIO_PUBLIC_URL: http://localhost:9000
PLUGINS_DIR: /app/plugins
INIT_CYPHER: /app/neo4j/init.cypher
ports:
- "8765:8765" # MCP JSON-RPC
volumes:
- ./plugins:/app/plugins:ro
- ./neo4j:/app/neo4j:ro
- ./mock-data:/app/mock-data:ro
volumes:
neo4j-data:
postgres-data:
minio-data:

218
docs/SMOKE.md Normal file
View File

@@ -0,0 +1,218 @@
# SMOKE — bring up the stack from a fresh clone and prove it works
This document is the single source of truth for "does the v1 stack actually
work end-to-end on a clean machine." It exists so v2 workers and CI can both
hit a known-good bring-up path without rediscovering the incantation.
## TL;DR (the 5 commands)
```bash
git clone https://git.homelab.local/kaykayyali/lore-engine-poc.git
cd lore-engine-poc
docker compose up -d --build
./scripts/ci-smoke.sh # waits for health, runs test.sh, tears down
```
`./scripts/ci-smoke.sh` is the authoritative smoke runner. It exits 0 on
success and non-zero with a clear error on the first failure. See
`scripts/ci-smoke.sh` for the exact step ordering. This document explains
*what* each step does and what the expected output looks like.
## Prerequisites
- Docker Engine 24+ with the compose plugin (`docker compose version` ≥ 2.20)
- `python3` available on the host (for `seed.py` and `test.sh`)
- `curl` for the MCP JSON-RPC calls in `test.sh`
- Outbound HTTPS to `git.homelab.local` (self-signed cert; clone URL is HTTPS
so the Gitea homelab cert path works)
- Ports 7474, 7687, 5432, 9000-9001, 8765 free on the host
- ~2 GB of free disk for the Docker images (neo4j + postgres + minio + gateway)
## Step-by-step (annotated)
### 1. Clone
```bash
git clone https://git.homelab.local/kaykayyali/lore-engine-poc.git
cd lore-engine-poc
```
**Expected output:** a `lore-engine-poc/` directory with `README.md`,
`docker-compose.yml`, `seed.py`, `test.sh`, `gateway/`, `plugins/`,
`neo4j/`, `postgres/`, `docs/`, `scripts/`.
### 2. Build + start the stack
```bash
docker compose up -d --build
```
**Expected output:**
```
[+] Running 5/5
✔ Network lore-engine-poc_default Created
✔ Volume lore-engine-poc_neo4j-data Created
✔ Container lore-neo4j Started
✔ Container lore-postgres Started
✔ Container lore-minio Started
✔ Container lore-gateway Started
```
**What happens under the hood:**
- `lore-neo4j` runs `neo4j:5.26-community` with APOC enabled. The
`neo4j/init.cypher` file is mounted at `/var/lib/neo4j/import/` and
loaded by `seed.py` on step 4 (not by the container itself — the
container only exposes the Bolt port 7687).
- `lore-postgres` runs `postgres:16-alpine`. `postgres/init.sql` defines
the `trade_log`, `image_manifest`, and `audit` tables; it's loaded by
`seed.py` on step 4.
- `lore-minio` runs `minio/minio:latest` with bucket auto-create via
the `MINIO_BROWSER_REDIRECT_URL` and `MINIO_BUCKET=lore-images` env.
- `lore-gateway` is built locally from `gateway/Dockerfile` and runs
`python server.py` on port 8765. It auto-loads every `*.py` file in
`plugins/` at startup.
**Health check timing:** neo4j takes ~15-25 s to become ready (initial
APOC scan), postgres ~3-5 s, minio ~5 s, gateway ~2 s. The
`scripts/ci-smoke.sh` runner waits for all four to report healthy
before proceeding (uses `docker compose ps` + a 60 s deadline per
service). On a slow first build, allow 2-3 min total.
### 3. Verify all four services are healthy
```bash
docker compose ps
```
**Expected output (key column is `STATUS`):**
```
NAME IMAGE STATUS
lore-neo4j neo4j:5.26-community Up X minutes (healthy)
lore-postgres postgres:16-alpine Up X minutes (healthy)
lore-minio minio/minio:latest Up X minutes (healthy)
lore-gateway lore-engine-poc-gateway Up X minutes
```
`lore-gateway` has no healthcheck (it just answers HTTP); the
`scripts/ci-smoke.sh` runner polls `GET /healthz` on the gateway
instead (see `gateway/server.py`).
### 4. Seed the world
```bash
python3 seed.py
```
**Expected output (last 5 lines):**
```
✔ Seeded 4 images
✔ Seeded 1 lineage group
✔ Seeded ~20 time-bounded relations
✔ Done in <X.Xs>
✅ seed complete — bash test.sh is ready to run
```
`seed.py` is idempotent (uses Cypher `MERGE` and SQL `ON CONFLICT`).
Re-running it is safe; counts will not double.
### 5. Run the end-to-end test
```bash
bash test.sh
```
**Expected output:** 11 sections, each printing a JSON response from
the gateway's MCP endpoint. The last line is the green check:
```
✅ all tool types tested
```
`test.sh` exits 0 on success. The 11 sections, in order:
1. `entity_context(Aldric Raventhorne)` — one-hop summary JSON
2. `was_true_at(House Vyr allied Merchants Guild @ 2nd_age.year_230)``true`
3. `was_true_at(Crimson Pact allied House Vyr @ 2nd_age.year_230)``false`
4. `state_at(Aldric Raventhorne @ 2nd_age.year_260)` — state snapshot JSON
5. `ancestors_of(Aldric Raventhorne, 5 generations)` — non-empty ancestors list
6. `lineage_of(Aldric Raventhorne)` — lineage summary
7. `log_trade(...)``{"logged": true, "total_price": <computed>}`
8. `market_price(pale_ledger)``{"item_id": "pale_ledger", "sample_size": ≥1, ...}`
9. `recall_images(entity_id=aldric)``image count: 1`, presigned URL
resolves to a real PNG (`HTTP 200`, `image/png`, 9106 bytes,
512×768 RGB)
10. `search_images_by_caption(q=aldric)` — at least 1 match
11. `register_image(...)``{"registered": true, "image_id": "img_test"}`
If any section fails, `test.sh` exits non-zero with the failing JSON
response on stderr.
## CI runner — `scripts/ci-smoke.sh`
The CI runner wraps steps 2-5 in a single script that:
1. Runs `docker compose up -d --build`
2. Polls `docker compose ps` until all four services are `healthy`
(60 s deadline per service, fails loudly on timeout)
3. Polls `curl -sf http://localhost:8765/healthz` until the gateway
responds (the `/healthz` endpoint lists registered tools — its
`200 OK` + non-empty body proves the gateway auto-loaded plugins)
4. Runs `python3 seed.py`
5. Runs `bash test.sh`
6. Exits 0 if all five stages passed, non-zero on the first failure
The script deliberately does NOT tear the stack down on failure — that
makes post-mortem debugging easier. The caller (CI runner or developer)
is responsible for `docker compose down -v` after inspecting the result.
### Why a shell script, not a GitHub Actions YAML?
This repo is hosted on a self-hosted Gitea instance
(`git.homelab.local`) without a Gitea Actions runner wired up yet.
A pure-shell script is the smallest possible CI primitive — it runs
identically on the developer's laptop, on a CI VM, and in a one-off
`bash` invocation, with no extra moving parts. When Gitea Actions is
configured for this repo, the script becomes a single
`- run: ./scripts/ci-smoke.sh` step. See `docs/ARCHITECTURE.md` (TODO)
for the eventual CI topology.
## Tear down
```bash
docker compose down -v
```
`-v` removes the named volumes (`lore-engine-poc_neo4j-data`,
`lore-engine-poc_pg-data`, `lore-engine-poc_minio-data`) so the next
bring-up starts from a clean slate. Omit `-v` to keep state across
restarts.
## Troubleshooting
| Symptom | Likely cause | Fix |
|---|---|---|
| `lore-neo4j` stuck in `(health: starting)` | First-boot APOC scan; needs more RAM | Wait 60 s. If still starting, check `docker logs lore-neo4j` for OOM. The compose heap max is 1g — bump `NEO4J_server_memory_heap_max__size` if your host has it. |
| `seed.py` fails on `MERGE` with constraint violation | The schema was already initialized in a prior run with a conflicting constraint | `docker compose down -v` and re-run from step 2 |
| `test.sh` section 9 returns `image count: 0` | MinIO bucket not initialized | `docker exec lore-minio mc alias set local http://localhost:9000 lorelore <secret>` then `mc mb -p local/lore-images`. Re-run `seed.py`. |
| `test.sh` section 2 returns `false` for the Vyr/Merchants alliance | A prior run seeded a conflicting fact | The seed is idempotent; if you mutated the data, `docker compose down -v` and reseed. |
| `git clone` fails with `SSL certificate problem` | `git.homelab.local` uses a self-signed cert | `git config --global http.sslVerify false` (dev only), or add the cert to your system trust store. The repo's HTTPS URL is intentional — the cert path is documented in the user's homelab setup. |
## What this smoke proves
After `./scripts/ci-smoke.sh` exits 0, you've proven:
- [x] All four Docker images build from the committed `docker-compose.yml` and Dockerfiles
- [x] Neo4j accepts Bolt connections and the `neo4j/init.cypher` schema applies cleanly
- [x] Postgres accepts connections and the `postgres/init.sql` schema applies cleanly
- [x] MinIO starts and the `lore-images` bucket is reachable
- [x] The gateway starts, auto-loads all 4 plugins (`world`, `lineage`, `trade`, `images`), and serves MCP JSON-RPC on :8765
- [x] `seed.py` is idempotent and populates the expected graph + tables + bucket objects
- [x] Every one of the 11 tool invocations in `test.sh` returns a sane response
That is the v1 contract. If `./scripts/ci-smoke.sh` is green, v2 work
(T2 pgvector, T3 consistency skeleton, etc.) can build on top.

BIN
docs/aldric_portrait.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.9 KiB

11
gateway/Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
# Lore Engine POC gateway
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8765
CMD ["python", "server.py"]

10
gateway/requirements.txt Normal file
View File

@@ -0,0 +1,10 @@
fastapi==0.115.0
uvicorn[standard]==0.30.6
neo4j==5.25.0
psycopg2-binary==2.9.9
minio==7.2.8
pydantic==2.9.2
httpx==0.27.2
python-multipart==0.0.10
Pillow==10.4.0
boto3==1.35.36

210
gateway/server.py Normal file
View File

@@ -0,0 +1,210 @@
"""
Lore Engine POC — minimal MCP-compatible JSON-RPC gateway.
Plugin architecture: every .py file in plugins/ is imported at startup.
A plugin exposes a `register(registry)` function that calls
registry.tool(name, description, schema, handler) to add MCP tools.
The gateway serves tools/list and tools/call.
This is the proof: adding a new tool is a new file in plugins/, not
a change to server.py. The plugin boundary is data-driven.
"""
import importlib
import importlib.util
import logging
import os
import sys
from pathlib import Path
from typing import Any, Callable
# Make the gateway package importable by plugins regardless of CWD.
sys.path.insert(0, str(Path(__file__).resolve().parent))
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
# ─── Plugin registry ────────────────────────────────────────────────────────
class ToolDef(BaseModel):
name: str
description: str
input_schema: dict
handler: Callable # not serialized; in-process only
class ToolRegistry:
def __init__(self):
self._tools: dict[str, ToolDef] = {}
def tool(self, name: str, description: str, input_schema: dict):
"""Decorator: register a handler as an MCP tool."""
def deco(fn: Callable) -> Callable:
self._tools[name] = ToolDef(
name=name, description=description,
input_schema=input_schema, handler=fn
)
return fn
return deco
def list(self) -> list[dict]:
return [
{"name": t.name, "description": t.description, "inputSchema": t.input_schema}
for t in self._tools.values()
]
def call(self, name: str, arguments: dict) -> Any:
if name not in self._tools:
raise KeyError(f"unknown tool: {name}")
return self._tools[name].handler(arguments)
REGISTRY = ToolRegistry()
# ─── Store connections (lazy, shared across plugins) ─────────────────────────
def get_neo4j():
from neo4j import GraphDatabase
return GraphDatabase.driver(
os.environ["NEO4J_URL"],
auth=(os.environ["NEO4J_USER"], os.environ["NEO4J_PASSWORD"])
)
def get_postgres():
import psycopg2
return psycopg2.connect(os.environ["POSTGRES_URL"])
def get_minio():
from minio import Minio
return Minio(
os.environ["MINIO_URL"].replace("http://", ""),
access_key=os.environ["MINIO_ACCESS_KEY"],
secret_key=os.environ["MINIO_SECRET_KEY"],
secure=False,
)
# ─── Plugin discovery ────────────────────────────────────────────────────────
def load_plugins(plugins_dir: str):
"""Import every .py file in plugins/ — each one calls register(REGISTRY)."""
p = Path(plugins_dir)
if not p.exists():
return []
loaded = []
# Make `from server import REGISTRY, get_neo4j, get_postgres, get_minio`
# work inside plugin files even though they're loaded via spec_from_file_location.
import sys as _sys
_sys.modules["server"] = _sys.modules[__name__]
for f in sorted(p.glob("*.py")):
if f.name.startswith("_"):
continue
spec = importlib.util.spec_from_file_location(f"plugin_{f.stem}", f)
if spec is None or spec.loader is None:
logging.warning(f"could not load plugin {f}")
continue
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
if hasattr(mod, "register"):
mod.register(REGISTRY)
loaded.append(f.stem)
return loaded
# ─── HTTP / JSON-RPC server ──────────────────────────────────────────────────
app = FastAPI(title="Lore Engine POC", version="0.1.0")
@app.on_event("startup")
def _startup():
plugins_dir = os.environ.get("PLUGINS_DIR", "/app/plugins")
loaded = load_plugins(plugins_dir)
logging.info(f"Loaded {len(loaded)} plugins: {loaded}")
logging.info(f"Registered {len(REGISTRY.list())} tools")
# Run any pre-shipped schema init scripts (idempotent CREATE CONSTRAINT/INDEX).
init_cypher = os.environ.get("INIT_CYPHER")
if init_cypher and Path(init_cypher).exists():
try:
from neo4j import GraphDatabase
d = GraphDatabase.driver(
os.environ["NEO4J_URL"],
auth=(os.environ["NEO4J_USER"], os.environ["NEO4J_PASSWORD"])
)
with d.session() as s, open(init_cypher) as f:
cypher = f.read()
# Strip line comments (// ...) before splitting on ';'
cleaned_lines = []
for line in cypher.splitlines():
stripped = line.strip()
if stripped.startswith("//"):
continue
cleaned_lines.append(line)
cleaned = "\n".join(cleaned_lines)
with d.session() as s:
for stmt in [s.strip() for s in cleaned.split(";") if s.strip()]:
s.run(stmt)
d.close()
logging.info(f"applied init.cypher from {init_cypher}")
except Exception as e:
logging.warning(f"init.cypher load failed: {e}")
@app.get("/healthz")
def healthz():
return {
"status": "ok",
"plugins": [t["name"] for t in REGISTRY.list()],
}
@app.post("/mcp")
async def mcp(request: Request):
"""Minimal MCP JSON-RPC: tools/list and tools/call."""
body = await request.json()
method = body.get("method")
req_id = body.get("id")
params = body.get("params", {})
if method == "tools/list":
return JSONResponse({
"jsonrpc": "2.0", "id": req_id,
"result": {"tools": REGISTRY.list()}
})
if method == "tools/call":
name = params.get("name")
args = params.get("arguments", {})
try:
result = REGISTRY.call(name, args)
return JSONResponse({
"jsonrpc": "2.0", "id": req_id,
"result": {
"content": [{"type": "text", "text": _jsonify(result)}],
"isError": False,
}
})
except Exception as e:
logging.exception("tool call failed")
return JSONResponse({
"jsonrpc": "2.0", "id": req_id,
"result": {
"content": [{"type": "text", "text": f"error: {e}"}],
"isError": True,
}
})
return JSONResponse({
"jsonrpc": "2.0", "id": req_id,
"error": {"code": -32601, "message": f"unknown method: {method}"}
}, status_code=400)
def _jsonify(obj):
"""JSON serializer that handles datetime, sets, and Neo4j DateTime."""
import json
from datetime import datetime, date
def default(o):
if isinstance(o, (datetime, date)):
return o.isoformat()
if isinstance(o, set):
return list(o)
return str(o)
return json.dumps(obj, default=default, indent=2)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8765, log_level="info")

28
neo4j/init.cypher Normal file
View File

@@ -0,0 +1,28 @@
// Lore Engine POC — minimal schema for the closed-world ontology.
// Mirrors the design in lore-engine/docs/01-ontology.md, scoped for the POC.
CREATE CONSTRAINT person_id IF NOT EXISTS FOR (p:Person) REQUIRE p.id IS UNIQUE;
CREATE CONSTRAINT faction_id IF NOT EXISTS FOR (f:Faction) REQUIRE f.id IS UNIQUE;
CREATE CONSTRAINT location_id IF NOT EXISTS FOR (l:Location) REQUIRE l.id IS UNIQUE;
CREATE CONSTRAINT era_slug IF NOT EXISTS FOR (e:Era) REQUIRE e.slug IS UNIQUE;
CREATE CONSTRAINT event_id IF NOT EXISTS FOR (e:Event) REQUIRE e.id IS UNIQUE;
CREATE CONSTRAINT item_id IF NOT EXISTS FOR (i:Item) REQUIRE i.id IS UNIQUE;
CREATE CONSTRAINT lineage_id IF NOT EXISTS FOR (l:Lineage) REQUIRE l.id IS UNIQUE;
// Consistency engine violation labels (T3 — stubs; T5 populates the data).
// All four share an id + severity + status contract. type discriminates
// within the label and carries the specific shape (claim_ids, expected_era, etc.)
CREATE CONSTRAINT contradiction_id IF NOT EXISTS FOR (v:Contradiction) REQUIRE v.id IS UNIQUE;
CREATE CONSTRAINT anachronism_id IF NOT EXISTS FOR (v:Anachronism) REQUIRE v.id IS UNIQUE;
CREATE CONSTRAINT orphan_id IF NOT EXISTS FOR (v:Orphan) REQUIRE v.id IS UNIQUE;
CREATE CONSTRAINT ontology_violation_id IF NOT EXISTS FOR (v:OntologyViolation) REQUIRE v.id IS UNIQUE;
CREATE INDEX era_parent IF NOT EXISTS FOR (e:Era) ON (e.parent_slug);
CREATE INDEX person_tier IF NOT EXISTS FOR (p:Person) ON (p.tier);
CREATE INDEX violation_severity IF NOT EXISTS FOR (v:Contradiction) ON (v.severity);
CREATE INDEX violation_severity2 IF NOT EXISTS FOR (v:Anachronism) ON (v.severity);
CREATE INDEX violation_status IF NOT EXISTS FOR (v:Contradiction) ON (v.status);
CREATE INDEX violation_status2 IF NOT EXISTS FOR (v:Anachronism) ON (v.status);
// Era tree: every Era has CONTAINS sub-eras or PART_OF parents
// (:Era {slug, name, start, end}) -[:PART_OF]-> (:Era)

153
plugins/consistency.py Normal file
View File

@@ -0,0 +1,153 @@
"""
consistency plugin — canon violation detection.
Tools (skeleton — real implementations land in T5):
- find_contradictions(severity="any"): find Contradiction nodes
- find_anachronisms(severity="any"): find Anachronism nodes
- find_orphans(): find Orphan nodes
- find_ontology_violations(): find OntologyViolation nodes
Each tool returns {"violations": [...], "count": N}. For now the graph is
empty (no violation nodes yet), so every call returns {"violations": [], "count": 0}.
The stub Cypher still exercises the connection and confirms the labels exist
once T5 starts writing them.
"""
from server import get_neo4j, REGISTRY
def _q(query, params=None):
"""Run a single read query against Neo4j, return list of dicts."""
driver = get_neo4j()
with driver.session() as s:
result = s.run(query, params or {})
return [dict(r) for r in result]
# ─── shared severity filter ──────────────────────────────────────────────────
# "any" (default) returns every violation; "critical"/"major"/"minor" filters
# by the severity property. The T5 runner will populate the severity field;
# for now severity is a stub filter that simply skips the WHERE clause when "any".
def _severity_clause(severity: str) -> str:
if severity in (None, "", "any"):
return ""
return " AND v.severity = $severity "
# ─── tool: find_contradictions ───────────────────────────────────────────────
@REGISTRY.tool(
name="find_contradictions",
description="Find Contradiction nodes — claims that directly conflict with each other in canon. Returns the list of conflicts with their severity and the conflicting claim ids.",
input_schema={
"type": "object",
"properties": {
"severity": {
"type": "string",
"enum": ["any", "critical", "major", "minor"],
"default": "any",
"description": "Filter by severity. 'any' returns all contradictions.",
},
},
"required": [],
},
)
def find_contradictions(args):
severity = args.get("severity", "any")
rows = _q("""
MATCH (v:Contradiction)
WHERE 1=1 %s
RETURN v.id AS id, v.type AS type, v.severity AS severity,
v.status AS status, v.claim_ids AS claim_ids,
v.summary AS summary
ORDER BY
CASE v.severity WHEN 'critical' THEN 0 WHEN 'major' THEN 1
WHEN 'minor' THEN 2 ELSE 3 END,
v.id ASC
""" % _severity_clause(severity), {"severity": severity})
return {"violations": rows, "count": len(rows)}
# ─── tool: find_anachronisms ─────────────────────────────────────────────────
@REGISTRY.tool(
name="find_anachronisms",
description="Find Anachronism nodes — facts that appear in the wrong era or timeline position. Returns the list with severity, the out-of-place entity, and the expected vs. actual era.",
input_schema={
"type": "object",
"properties": {
"severity": {
"type": "string",
"enum": ["any", "critical", "major", "minor"],
"default": "any",
"description": "Filter by severity. 'any' returns all anachronisms.",
},
},
"required": [],
},
)
def find_anachronisms(args):
severity = args.get("severity", "any")
rows = _q("""
MATCH (v:Anachronism)
WHERE 1=1 %s
RETURN v.id AS id, v.type AS type, v.severity AS severity,
v.status AS status, v.entity_id AS entity_id,
v.expected_era AS expected_era, v.actual_era AS actual_era,
v.summary AS summary
ORDER BY
CASE v.severity WHEN 'critical' THEN 0 WHEN 'major' THEN 1
WHEN 'minor' THEN 2 ELSE 3 END,
v.id ASC
""" % _severity_clause(severity), {"severity": severity})
return {"violations": rows, "count": len(rows)}
# ─── tool: find_orphans ──────────────────────────────────────────────────────
@REGISTRY.tool(
name="find_orphans",
description="Find Orphan nodes — canon entities (Person, Faction, Location, Item, Event) that have no relations to any other entity, indicating they're not yet integrated into the world graph.",
input_schema={
"type": "object",
"properties": {},
"required": [],
},
)
def find_orphans(args):
rows = _q("""
MATCH (v:Orphan)
RETURN v.id AS id, v.type AS type, v.severity AS severity,
v.status AS status, v.entity_id AS entity_id,
v.entity_label AS entity_label, v.summary AS summary
ORDER BY v.entity_label ASC, v.entity_id ASC
""")
return {"violations": rows, "count": len(rows)}
# ─── tool: find_ontology_violations ──────────────────────────────────────────
@REGISTRY.tool(
name="find_ontology_violations",
description="Find OntologyViolation nodes — typed relations that violate the declared schema (e.g. PARENT_OF between two Factions, or MEMBER_OF pointing at a Person). Returns the list with the offending edge and the rule it broke.",
input_schema={
"type": "object",
"properties": {},
"required": [],
},
)
def find_ontology_violations(args):
rows = _q("""
MATCH (v:OntologyViolation)
RETURN v.id AS id, v.type AS type, v.severity AS severity,
v.status AS status, v.rule_id AS rule_id,
v.relation AS relation, v.subject_label AS subject_label,
v.object_label AS object_label, v.summary AS summary
ORDER BY v.rule_id ASC, v.id ASC
""")
return {"violations": rows, "count": len(rows)}
def register(registry):
"""Plugin entry point — server.py calls this. Decorators registered the tools."""
pass

209
plugins/images.py Normal file
View File

@@ -0,0 +1,209 @@
"""
images plugin — MinIO-backed image recall with Neo4j entity links.
Demonstrates the "different DB for different purpose" pattern:
- Postgres holds image MANIFESTS (metadata, tags, captions) so the LLM
can decide which images to surface.
- MinIO holds the actual BYTES (PNGs, JPEGs).
- Neo4j holds the LINK from an image to the entity it depicts.
The LLM calls recall_images(entity=...) to get back a list of
{image_id, caption, object_key, presigned_url} so it can either describe
the image (from caption) or fetch the bytes (from the presigned URL).
"""
import datetime as dt
import logging
import os
from urllib.parse import urlparse, urlunparse
from server import get_postgres, get_neo4j, get_minio, REGISTRY
LOG = logging.getLogger(__name__)
def _q_neo4j(query, params=None):
driver = get_neo4j()
with driver.session() as s:
return [dict(r) for r in s.run(query, params or {})]
def _q_pg(sql, params=None, fetch=True):
conn = get_postgres()
try:
with conn.cursor() as cur:
cur.execute(sql, params or ())
if fetch and cur.description:
cols = [d[0] for d in cur.description]
return [dict(zip(cols, r)) for r in cur.fetchall()]
return []
finally:
conn.close()
def _presign(object_key: str) -> str:
"""
Return a presigned MinIO URL the caller can fetch bytes from.
Tricky bit: we always sign against the *internal* MinIO endpoint (the
one this container can reach), but the URL we return is rewritten to
use the *public* endpoint (the one the client will actually hit). This
works because AWS-style signatures in MinIO are computed over the
canonical request including the *Host header*, not the hostname in the
URL. We set the Host header explicitly to the public host in the
SigV4 signing step.
"""
import datetime as dt
from botocore.config import Config
import boto3
public = os.environ.get("MINIO_PUBLIC_URL", os.environ["MINIO_URL"])
internal = os.environ["MINIO_URL"]
bucket = os.environ["MINIO_BUCKET"]
if public == internal:
# Easy path: same endpoint
return get_minio().presigned_get_object(
bucket, object_key, expires=dt.timedelta(hours=1)
)
# Sign against the public endpoint directly so the signature
# matches the URL we're handing out.
parsed = urlparse(public)
s3 = boto3.client(
"s3",
endpoint_url=f"{parsed.scheme}://{parsed.hostname}:{parsed.port or 80}",
aws_access_key_id=os.environ["MINIO_ACCESS_KEY"],
aws_secret_access_key=os.environ["MINIO_SECRET_KEY"],
region_name="us-east-1",
config=Config(signature_version="s3v4"),
)
return s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": object_key},
ExpiresIn=3600,
)
@REGISTRY.tool(
name="register_image",
description="Register an image in the manifest. Idempotent on (image_id). The object must already be in MinIO at the given object_key.",
input_schema={
"type": "object",
"properties": {
"image_id": {"type": "string", "description": "Caller-chosen unique id"},
"object_key": {"type": "string", "description": "MinIO object key, e.g. 'characters/aldric.png'"},
"entity_id": {"type": "string", "description": "Linked Neo4j entity id (Person.id, Location.id, etc.)"},
"entity_type": {"type": "string", "enum": ["Person", "Faction", "Location", "Item", "Event"]},
"caption": {"type": "string", "description": "1-3 sentences describing the image for the LLM"},
"tags": {"type": "array", "items": {"type": "string"}},
"era": {"type": "string", "description": "Canonical era slug, e.g. '2nd_age'"},
"width": {"type": "integer"},
"height": {"type": "integer"},
"bytes": {"type": "integer"},
},
"required": ["image_id", "object_key", "caption"],
},
)
def register_image(args):
_q_pg("""
INSERT INTO image_manifest
(image_id, object_key, entity_id, entity_type, caption, tags, era, width, height, bytes)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (image_id) DO UPDATE
SET object_key = EXCLUDED.object_key,
entity_id = EXCLUDED.entity_id,
caption = EXCLUDED.caption,
tags = EXCLUDED.tags,
era = EXCLUDED.era
""", (
args["image_id"], args["object_key"], args.get("entity_id"),
args.get("entity_type"), args["caption"], args.get("tags", []),
args.get("era"), args.get("width"), args.get("height"), args.get("bytes"),
), fetch=False)
# Link in Neo4j so entity_context can see "this image depicts X"
if args.get("entity_id") and args.get("entity_type"):
_q_neo4j("""
MATCH (e {id: $entity_id})
MERGE (img:Image {id: $image_id})
ON CREATE SET img.caption = $caption, img.era = $era
MERGE (img)-[:DEPICTS]->(e)
""", {
"entity_id": args["entity_id"], "image_id": args["image_id"],
"caption": args["caption"], "era": args.get("era"),
})
return {"registered": True, "image_id": args["image_id"]}
@REGISTRY.tool(
name="recall_images",
description="Recall images for an entity. Returns a list of {image_id, caption, tags, era, presigned_url}.",
input_schema={
"type": "object",
"properties": {
"entity_id": {"type": "string", "description": "Person.id / Location.id / etc."},
"tag": {"type": "string", "description": "Optional tag filter (e.g. 'portrait', 'battle')"},
"limit": {"type": "integer", "default": 5},
},
"required": ["entity_id"],
},
)
def recall_images(args):
if args.get("tag"):
rows = _q_pg("""
SELECT image_id, caption, tags, era, object_key
FROM image_manifest
WHERE entity_id = %s AND %s = ANY(tags)
ORDER BY uploaded_at DESC LIMIT %s
""", (args["entity_id"], args["tag"], args.get("limit", 5)))
else:
rows = _q_pg("""
SELECT image_id, caption, tags, era, object_key
FROM image_manifest
WHERE entity_id = %s
ORDER BY uploaded_at DESC LIMIT %s
""", (args["entity_id"], args.get("limit", 5)))
out = []
for r in rows:
out.append({
"image_id": r["image_id"],
"caption": r["caption"],
"tags": r["tags"],
"era": r["era"],
"presigned_url": _presign(r["object_key"]),
})
return {"entity_id": args["entity_id"], "count": len(out), "images": out}
@REGISTRY.tool(
name="search_images_by_caption",
description="Find images whose caption or tags contain a substring. Use this when the LLM doesn't know the exact entity id.",
input_schema={
"type": "object",
"properties": {
"q": {"type": "string", "description": "Substring to search for in caption or tags"},
"limit": {"type": "integer", "default": 5},
},
"required": ["q"],
},
)
def search_images_by_caption(args):
like = f"%{args['q']}%"
rows = _q_pg("""
SELECT image_id, entity_id, entity_type, caption, tags, era, object_key
FROM image_manifest
WHERE caption ILIKE %s OR EXISTS (SELECT 1 FROM unnest(tags) tag WHERE tag ILIKE %s)
ORDER BY uploaded_at DESC LIMIT %s
""", (like, like, args.get("limit", 5)))
out = []
for r in rows:
out.append({
"image_id": r["image_id"],
"entity_id": r["entity_id"],
"entity_type": r["entity_type"],
"caption": r["caption"],
"tags": r["tags"],
"era": r["era"],
"presigned_url": _presign(r["object_key"]),
})
return {"q": args["q"], "count": len(out), "images": out}
def register(registry):
pass

97
plugins/lineage.py Normal file
View File

@@ -0,0 +1,97 @@
"""
lineage plugin — bloodline / family tree queries.
Tools:
- ancestors_of(person, generations): walk PARENT_OF upward.
- descendants_of(person, generations): walk PARENT_OF downward.
- lineage_of(person): the Lineage node this person belongs to + its members.
"""
from server import get_neo4j, REGISTRY
def _q(query, params=None):
driver = get_neo4j()
with driver.session() as s:
result = s.run(query, params or {})
return [dict(r) for r in result]
@REGISTRY.tool(
name="ancestors_of",
description="Walk PARENT_OF upstream from a person for N generations. Returns chain of ancestors with their lifespans.",
input_schema={
"type": "object",
"properties": {
"person": {"type": "string"},
"generations": {"type": "integer", "default": 5, "minimum": 1, "maximum": 20},
},
"required": ["person"],
},
)
def ancestors_of(args):
# In our schema, (parent)-[:PARENT_OF]->(child). So to get ancestors of `person`,
# we walk PARENT_OF in the *incoming* direction, i.e. (ancestor)-[:PARENT_OF]->(person).
rows = _q("""
MATCH path = (ancestor:Person)-[:PARENT_OF*1..%d]->(p:Person {name: $person})
UNWIND nodes(path) AS n
WITH ancestor WHERE ancestor <> p
RETURN DISTINCT ancestor.name AS name, ancestor.born AS born, ancestor.died AS died,
ancestor.id AS id
ORDER BY ancestor.born ASC
""" % args.get("generations", 5), {"person": args["person"]})
return {"ancestors": rows}
@REGISTRY.tool(
name="descendants_of",
description="Walk PARENT_OF downward from a person for N generations. Returns all known descendants.",
input_schema={
"type": "object",
"properties": {
"person": {"type": "string"},
"generations": {"type": "integer", "default": 5, "minimum": 1, "maximum": 20},
},
"required": ["person"],
},
)
def descendants_of(args):
# In our schema, (parent)-[:PARENT_OF]->(child). So descendants of `person` follow
# the outgoing PARENT_OF direction.
rows = _q("""
MATCH (a:Person {name: $person})-[:PARENT_OF*1..%d]->(desc:Person)
RETURN DISTINCT desc.name AS name, desc.born AS born, desc.died AS died,
desc.id AS id
ORDER BY desc.born ASC
""" % args.get("generations", 5), {"person": args["person"]})
return {"descendants": rows}
@REGISTRY.tool(
name="lineage_of",
description="The Lineage group this person belongs to, plus all other members of the bloodline.",
input_schema={
"type": "object",
"properties": {"person": {"type": "string"}},
"required": ["person"],
},
)
def lineage_of(args):
rows = _q("""
MATCH (p:Person {name: $person})-[:MEMBER_OF]->(lin:Lineage)
OPTIONAL MATCH (other:Person)-[:MEMBER_OF]->(lin)
RETURN lin.name AS lineage, lin.id AS lineage_id,
collect(DISTINCT {name: other.name, born: other.born, died: other.died}) AS members
""", {"person": args["person"]})
if not rows:
return {"found": False, "person": args["person"]}
r = rows[0]
return {
"person": args["person"],
"lineage": r["lineage"],
"lineage_id": r["lineage_id"],
"members": r["members"],
}
def register(registry):
pass

115
plugins/trade.py Normal file
View File

@@ -0,0 +1,115 @@
"""
trade plugin — Postgres-backed operational data.
Demonstrates the polyglot pattern: a domain type (trade log entry) that
isn't a core ontology concept, backed by Postgres because it's
high-volume time-series data, queryable through the same MCP gateway.
"""
from server import get_postgres, REGISTRY
def _q(sql, params=None, fetch=True):
conn = get_postgres()
try:
with conn.cursor() as cur:
cur.execute(sql, params or ())
if fetch and cur.description:
cols = [d[0] for d in cur.description]
return [dict(zip(cols, r)) for r in cur.fetchall()]
return []
finally:
conn.close()
@REGISTRY.tool(
name="log_trade",
description="Record a trade. Buyer and seller must exist as Person or Faction nodes (call entity_context to verify).",
input_schema={
"type": "object",
"properties": {
"buyer_id": {"type": "string"},
"seller_id": {"type": "string"},
"item_id": {"type": "string"},
"quantity": {"type": "number"},
"unit": {"type": "string", "default": "gp"},
"unit_price": {"type": "number"},
"location_id": {"type": "string"},
"in_fiction_time": {"type": "string"},
"notes": {"type": "string"},
},
"required": ["buyer_id", "seller_id", "item_id", "quantity", "unit_price"],
},
)
def log_trade(args):
total = float(args["quantity"]) * float(args["unit_price"])
_q("""
INSERT INTO trade_log
(buyer_id, seller_id, item_id, quantity, unit, unit_price, total_price, location_id, in_fiction_time, notes)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (
args["buyer_id"], args["seller_id"], args["item_id"],
args["quantity"], args.get("unit", "gp"), args["unit_price"], total,
args.get("location_id"), args.get("in_fiction_time"), args.get("notes"),
), fetch=False)
return {"logged": True, "total_price": total}
@REGISTRY.tool(
name="trades_by_buyer",
description="List trades where a given entity was the buyer, most recent first.",
input_schema={
"type": "object",
"properties": {
"buyer_id": {"type": "string"},
"limit": {"type": "integer", "default": 10},
},
"required": ["buyer_id"],
},
)
def trades_by_buyer(args):
rows = _q("""
SELECT id, occurred_at, seller_id, item_id, quantity, unit, unit_price, total_price, location_id, notes
FROM trade_log
WHERE buyer_id = %s
ORDER BY occurred_at DESC
LIMIT %s
""", (args["buyer_id"], args.get("limit", 10)))
return {"buyer": args["buyer_id"], "count": len(rows), "trades": rows}
@REGISTRY.tool(
name="market_price",
description="Average price for an item_id over the last N records. Computed from the trade log.",
input_schema={
"type": "object",
"properties": {
"item_id": {"type": "string"},
"limit": {"type": "integer", "default": 100, "minimum": 1, "maximum": 1000},
},
"required": ["item_id"],
},
)
def market_price(args):
rows = _q("""
SELECT unit_price, total_price, occurred_at
FROM trade_log
WHERE item_id = %s
ORDER BY occurred_at DESC
LIMIT %s
""", (args["item_id"], args.get("limit", 100)))
if not rows:
return {"item_id": args["item_id"], "sample_size": 0}
prices = [float(r["unit_price"]) for r in rows]
return {
"item_id": args["item_id"],
"sample_size": len(prices),
"avg_unit_price": round(sum(prices) / len(prices), 2),
"min_unit_price": min(prices),
"max_unit_price": max(prices),
"most_recent": rows[0]["occurred_at"].isoformat() if rows else None,
}
def register(registry):
pass

128
plugins/world.py Normal file
View File

@@ -0,0 +1,128 @@
"""
world plugin — pure Neo4j queries.
Tools:
- entity_context(name): one-hop summary of a Person / Faction / Location / Item.
- was_true_at(relation, subject, object, at_time): time-bounded edge lookup.
- state_at(entity, at_time): comprehensive snapshot of an entity at a time.
"""
from server import get_neo4j, REGISTRY
def _q(query, params=None):
"""Run a single read query against Neo4j, return list of dicts."""
driver = get_neo4j()
with driver.session() as s:
result = s.run(query, params or {})
return [dict(r) for r in result]
@REGISTRY.tool(
name="entity_context",
description="One-hop summary of a named entity (Person, Faction, Location, Item). Returns labels, properties, and immediate relations.",
input_schema={
"type": "object",
"properties": {"name": {"type": "string", "description": "Entity name to look up"}},
"required": ["name"],
},
)
def entity_context(args):
name = args["name"]
rows = _q("""
MATCH (e)
WHERE (e:Person OR e:Faction OR e:Location OR e:Item OR e:Event)
AND (e.name = $name OR e.id = $name)
OPTIONAL MATCH (e)-[r]->(other)
WHERE type(r) IN ['MEMBER_OF','RULED','LOCATED_IN','PART_OF','PARENT_OF','SPOUSE_OF','POSSESSES','PARTICIPATED_IN']
RETURN e, labels(e) AS labels,
collect(DISTINCT {rel: type(r), to: other.name, to_id: other.id}) AS relations
LIMIT 1
""", {"name": name})
if not rows:
return {"found": False, "name": name}
r = rows[0]
e = r["e"]
return {
"found": True,
"name": e.get("name"),
"id": e.get("id"),
"labels": r["labels"],
"properties": {k: v for k, v in dict(e).items() if not k.startswith("_")},
"relations": [rel for rel in r["relations"] if rel.get("to")],
}
@REGISTRY.tool(
name="was_true_at",
description="Check whether a typed relation was true between subject and object at a given in-fiction time. Times use the canonical {era}.{year} format, e.g. '2nd_age.year_340'.",
input_schema={
"type": "object",
"properties": {
"relation": {"type": "string", "description": "Edge type, e.g. RULED, ALLIED_WITH, MEMBER_OF"},
"subject": {"type": "string"},
"object": {"type": "string"},
"at_time": {"type": "string", "description": "Canonical time string, e.g. '2nd_age.year_340'"},
},
"required": ["relation", "subject", "object", "at_time"],
},
)
def was_true_at(args):
rows = _q("""
MATCH (s {name: $subject})-[r:`%s`]->(o {name: $object})
WHERE r.valid_from IS NULL OR $at_time >= r.valid_from
AND r.valid_until IS NULL OR $at_time <= r.valid_until
RETURN r, s, o
""" % args["relation"], {
"subject": args["subject"], "object": args["object"], "at_time": args["at_time"],
})
if not rows:
return {"was_true": False, "relation": args["relation"],
"subject": args["subject"], "object": args["object"], "at_time": args["at_time"]}
r = rows[0]["r"]
return {
"was_true": True,
"relation": args["relation"],
"valid_from": r.get("valid_from"),
"valid_until": r.get("valid_until"),
}
@REGISTRY.tool(
name="state_at",
description="Snapshot of an entity at a given in-fiction time: who/what they were allied with, where they were located, what they held.",
input_schema={
"type": "object",
"properties": {
"entity": {"type": "string"},
"at_time": {"type": "string", "description": "Canonical time string, e.g. '2nd_age.year_340'"},
},
"required": ["entity", "at_time"],
},
)
def state_at(args):
rows = _q("""
MATCH (e {name: $entity})
WHERE e:Person OR e:Faction OR e:Location OR e:Item
OPTIONAL MATCH (e)-[r]->(other)
WHERE type(r) IN ['MEMBER_OF','RULED','LOCATED_IN','PART_OF','POSSESSES','ALLIED_WITH','ENEMY_OF']
AND (r.valid_from IS NULL OR $at_time >= r.valid_from)
AND (r.valid_until IS NULL OR $at_time <= r.valid_until)
RETURN e, labels(e) AS labels,
collect(DISTINCT {rel: type(r), to: other.name}) AS active_relations
LIMIT 1
""", {"entity": args["entity"], "at_time": args["at_time"]})
if not rows:
return {"found": False, "entity": args["entity"]}
r = rows[0]
return {
"entity": r["e"].get("name"),
"at_time": args["at_time"],
"labels": r["labels"],
"active_relations": [x for x in r["active_relations"] if x.get("to")],
}
def register(registry):
"""Plugin entry point — server.py calls this."""
# Decorators already registered via the @REGISTRY.tool wrappers above.
pass

40
postgres/init.sql Normal file
View File

@@ -0,0 +1,40 @@
-- Lore Engine POC — minimal Postgres schema.
-- Operational data that doesn't belong in the world graph.
CREATE TABLE IF NOT EXISTS trade_log (
id BIGSERIAL PRIMARY KEY,
world_id TEXT NOT NULL DEFAULT 'default',
occurred_at TIMESTAMPTZ NOT NULL DEFAULT now(),
in_fiction_time TEXT,
buyer_id TEXT,
seller_id TEXT,
item_id TEXT,
quantity NUMERIC,
unit TEXT,
unit_price NUMERIC,
total_price NUMERIC,
location_id TEXT,
notes TEXT
);
CREATE INDEX IF NOT EXISTS trade_log_time ON trade_log (occurred_at DESC);
CREATE INDEX IF NOT EXISTS trade_log_buyer ON trade_log (buyer_id);
-- Image manifests. The actual bytes live in MinIO; this is metadata + tags
-- that the LLM can query to know what images exist.
CREATE TABLE IF NOT EXISTS image_manifest (
id BIGSERIAL PRIMARY KEY,
image_id TEXT NOT NULL UNIQUE,
object_key TEXT NOT NULL, -- the MinIO object key
entity_id TEXT, -- linked LoreEntity (e.g. Person.id)
entity_type TEXT, -- Person / Location / Event / Item
caption TEXT NOT NULL,
tags TEXT[],
era TEXT,
uploaded_at TIMESTAMPTZ NOT NULL DEFAULT now(),
width INT,
height INT,
bytes BIGINT
);
CREATE INDEX IF NOT EXISTS image_manifest_entity ON image_manifest (entity_id);
CREATE INDEX IF NOT EXISTS image_manifest_tags ON image_manifest USING GIN (tags);
CREATE INDEX IF NOT EXISTS image_manifest_era ON image_manifest (era);

209
scripts/ci-smoke.sh Executable file
View File

@@ -0,0 +1,209 @@
#!/usr/bin/env bash
# lore-engine-poc — CI smoke runner
#
# Brings up the stack from a clean working tree, waits for all four services
# to be healthy, runs the seed, runs test.sh, and exits 0/1.
#
# Designed to be run identically on a developer laptop, in CI, or in a
# one-off cron. See docs/SMOKE.md for the full rationale + troubleshooting.
#
# Usage:
# ./scripts/ci-smoke.sh # full bring-up + test + teardown
# ./scripts/ci-smoke.sh --keep-up # leave the stack running on success
# ./scripts/ci-smoke.sh --skip-build # skip `docker compose build`
#
# Exit codes:
# 0 smoke passed
# 1 a service did not become healthy in time
# 2 seed.py failed
# 3 test.sh failed
# 4 usage / argument error
# 5 docker compose not available
set -euo pipefail
# ─── argument parsing ────────────────────────────────────────────────────────
KEEP_UP=0
SKIP_BUILD=0
for arg in "$@"; do
case "$arg" in
--keep-up) KEEP_UP=1 ;;
--skip-build) SKIP_BUILD=1 ;;
-h|--help)
sed -n '2,18p' "$0" | sed 's/^# \{0,1\}//'
exit 0
;;
*)
echo "unknown arg: $arg" >&2
exit 4
;;
esac
done
# ─── helpers ────────────────────────────────────────────────────────────────
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR/.." # repo root (scripts/ is a sibling of docker-compose.yml)
REPO_ROOT="$(pwd)"
if ! command -v docker >/dev/null 2>&1; then
echo "FATAL: docker not on PATH" >&2
exit 5
fi
if ! docker compose version >/dev/null 2>&1; then
echo "FATAL: 'docker compose' (v2 plugin) not available — install the compose plugin" >&2
exit 5
fi
# Timestamped log so concurrent runs (rare) don't trample each other and so
# CI can grep the timestamped output for the failure point.
LOG="$REPO_ROOT/.smoke-$(date -u +%Y%m%dT%H%M%SZ).log"
exec > >(tee -a "$LOG") 2>&1
echo "=== ci-smoke starting at $(date -u +%Y-%m-%dT%H:%M:%SZ) ==="
echo "=== repo: $REPO_ROOT"
echo "=== log: $LOG"
echo
cleanup() {
local exit_code=$?
echo
echo "=== ci-smoke exiting with code $exit_code at $(date -u +%Y-%m-%dT%H:%M:%SZ) ==="
if [ $KEEP_UP -eq 0 ] && [ $exit_code -ne 0 ]; then
echo
echo "stack left running for post-mortem. Tear down with:"
echo " docker compose down -v"
fi
if [ $KEEP_UP -eq 1 ] && [ $exit_code -eq 0 ]; then
echo
echo "stack left running (--keep-up). Tear down with:"
echo " docker compose down -v"
fi
exit $exit_code
}
trap cleanup EXIT INT TERM
# ─── step 1: build + start ───────────────────────────────────────────────────
if [ $SKIP_BUILD -eq 0 ]; then
echo ">>> [1/5] docker compose build"
docker compose build
fi
echo
echo ">>> [1/5] docker compose up -d"
docker compose up -d
# ─── step 2: wait for services healthy ───────────────────────────────────────
echo
echo ">>> [2/5] waiting for neo4j, postgres, minio to be healthy (60s deadline each)"
SERVICES=(lore-neo4j lore-postgres lore-minio)
DEADLINE_SECS=60
for svc in "${SERVICES[@]}"; do
elapsed=0
while [ $elapsed -lt $DEADLINE_SECS ]; do
status=$(docker inspect -f '{{.State.Health.Status}}' "$svc" 2>/dev/null || echo "missing")
if [ "$status" = "healthy" ]; then
echo "$svc healthy (after ${elapsed}s)"
break
fi
if [ "$status" = "unhealthy" ]; then
echo "$svc reported UNHEALTHY:" >&2
docker logs --tail 50 "$svc" >&2
exit 1
fi
sleep 2
elapsed=$((elapsed + 2))
done
if [ $elapsed -ge $DEADLINE_SECS ]; then
echo "$svc did not become healthy within ${DEADLINE_SECS}s" >&2
echo " last status: $status" >&2
docker logs --tail 50 "$svc" >&2
exit 1
fi
done
# ─── step 3: wait for gateway /healthz ───────────────────────────────────────
echo
echo ">>> [3/5] waiting for gateway /healthz (60s deadline)"
elapsed=0
HEALTHZ_URL="${GATEWAY:-http://localhost:8765/healthz}"
while [ $elapsed -lt $DEADLINE_SECS ]; do
if response=$(curl -fsS "$HEALTHZ_URL" 2>/dev/null) && \
echo "$response" | python3 -c "import json,sys; d=json.loads(sys.stdin.read()); assert d.get('status')=='ok'; assert isinstance(d.get('plugins'), list) and len(d['plugins'])>0" 2>/dev/null; then
tool_count=$(echo "$response" | python3 -c "import json,sys; print(len(json.loads(sys.stdin.read())['plugins']))")
echo " ✔ gateway healthy, $tool_count tools registered"
break
fi
sleep 2
elapsed=$((elapsed + 2))
done
if [ $elapsed -ge $DEADLINE_SECS ]; then
echo " ✖ gateway /healthz did not return 200+valid JSON within ${DEADLINE_SECS}s" >&2
docker logs --tail 50 lore-gateway >&2 || true
exit 1
fi
# ─── step 4: seed (idempotent — skip if data already present) ────────────────
echo
echo ">>> [4/5] seed: check if data is already loaded"
seeded_already=0
# Probe Neo4j for any Person node. If the count > 0, treat as already seeded.
# (Cheap, ~50ms.) seed.py is idempotent so re-running is safe, but skipping
# the seed keeps the smoke fast when the caller just wants to re-verify
# test.sh against a known-good DB.
person_count=$(docker exec lore-neo4j cypher-shell -u neo4j -p lore-dev-password \
"MATCH (p:Person) RETURN count(p) AS n" 2>/dev/null \
| awk '/^[0-9]+$/{print; exit}' || echo "0")
if [ "${person_count:-0}" -gt 0 ] 2>/dev/null; then
echo " ✔ already seeded (Person count = $person_count), skipping seed.py"
seeded_already=1
fi
if [ $seeded_already -eq 0 ]; then
echo " → running python3 seed.py (host)"
if ! python3 seed.py 2>/tmp/seed.err; then
echo " ⚠ host seed.py failed: $(head -1 /tmp/seed.err)" >&2
echo " → falling back to docker run via the gateway network"
# Run seed.py inside a sidecar container on the lore-engine-poc_default
# network. We use the gateway image because it has all the python deps,
# then bind-mount the repo so seed.py can find the mock-data dir.
if ! docker run --rm --network lore-engine-poc_default \
-v "$REPO_ROOT":/work -w /work \
-e NEO4J_URL='bolt://neo4j:7687' \
-e NEO4J_USER=neo4j -e NEO4J_PASSWORD=lore-dev-password \
-e POSTGRES_URL='postgresql://lore:***@postgres:5432/lore' \
-e MINIO_URL='http://minio:9000' \
-e MINIO_ACCESS_KEY=lorelore -e MINIO_SECRET_KEY=lore-dev-password \
-e MINIO_BUCKET=lore-images \
--entrypoint python3 \
lore-engine-poc-gateway \
seed.py 2>/tmp/seed-docker.err; then
echo " ✖ seed failed in both host and docker modes" >&2
echo " host stderr: $(cat /tmp/seed.err)" >&2
echo " docker stderr: $(cat /tmp/seed-docker.err)" >&2
exit 2
fi
fi
echo " ✔ seed complete"
fi
# ─── step 5: e2e test ────────────────────────────────────────────────────────
echo
echo ">>> [5/5] bash test.sh"
if ! bash test.sh; then
echo " ✖ test.sh failed" >&2
exit 3
fi
echo " ✔ test.sh passed"
# ─── optional teardown ──────────────────────────────────────────────────────
if [ $KEEP_UP -eq 0 ]; then
echo
echo ">>> tearing down stack (use --keep-up to leave it running)"
docker compose down -v
else
echo
echo ">>> --keep-up set, stack left running"
fi
echo
echo "=== SMOKE PASSED at $(date -u +%Y-%m-%dT%H:%M:%SZ) ==="
exit 0

379
seed.py Normal file
View File

@@ -0,0 +1,379 @@
#!/usr/bin/env python3
"""
Generate a high-fantasy mock world and load it into the POC stack.
Mock world: the realm of Arda, two eras (1st and 2nd Age), three factions,
ten people, two locations, four items, ten events, ten lineage edges,
a handful of trades, and four images.
This script can be run repeatedly — it's idempotent (uses MERGE in Neo4j,
ON CONFLICT in Postgres).
"""
import datetime as dt
import os
import sys
import time
from pathlib import Path
from neo4j import GraphDatabase
import psycopg2
from minio import Minio
from PIL import Image, ImageDraw, ImageFont
# ─── config (also used by docker-compose) ────────────────────────────────────
NEO4J_URL = os.environ.get("NEO4J_URL", "bolt://localhost:7687")
NEO4J_USER = os.environ.get("NEO4J_USER", "neo4j")
NEO4J_PASS = os.environ.get("NEO4J_PASSWORD", "lore-dev-password")
PG_URL = os.environ.get("POSTGRES_URL", "postgresql://lore:***@localhost:5432/lore")
MINIO_URL = os.environ.get("MINIO_URL", "http://localhost:9000")
MINIO_USER = os.environ.get("MINIO_ACCESS_KEY", "lorelore")
MINIO_PASS = os.environ.get("MINIO_SECRET_KEY", "lore-dev-password")
MINIO_BUCKET = os.environ.get("MINIO_BUCKET", "lore-images")
# ─── mock data ───────────────────────────────────────────────────────────────
PEOPLE = [
# (id, name, born, died, tier, culture)
("theron", "Theron Ashveil", 10, 120, "noble", "Valdorni"),
("maric", "Maric Vyr", 85, 160, "noble", "Valdorni"),
("aldric", "Aldric Raventhorne", 220, 285, "noble", "Valdorni"),
("elara", "Elara Raventhorne", 220, None, "noble", "Valdorni"),
("cael", "Cael Vyr", 160, 240, "noble", "Valdorni"),
("yssa", "Yssa Raventhorne", 165, None, "noble", "Valdorni"),
("vex", "Vex the Silent", 180, None, "commoner","Mardsvillan"),
("alessia", "Alessia Dusk", 190, None, "commoner","Mardsvillan"),
("kael", "General Kael", 200, None, "noble", "Crimson Pact"),
("guildmaster","Guildmaster Torren", 175, None, "noble", "Mardsvillan"),
]
FACTIONS = [
# (id, name, founded, dissolved)
("house_vyr", "House Vyr", 85, None),
("crimson_pact", "The Crimson Pact", 150, None),
("merchants", "Merchants Guild", 100, None),
]
LOCATIONS = [
# (id, name)
("valdorn", "Valdorn"),
("mardsville", "Mardsville"),
("thornwall", "Thornwall Keep"),
("black_spire", "Black Spire Pass"),
]
ERAS = [
# (slug, name, start, end, parent)
("1st_age", "First Age", 0, 100, None),
("2nd_age", "Second Age", 100, 300, None),
("2nd_age.age_of_iron", "Age of Iron", 150, 300, "2nd_age"),
]
EVENTS = [
# (id, name, in_fiction_time, era_slug, location_id)
("e1", "Battle of Black Spire", "2nd_age.year_232", "2nd_age", "black_spire"),
("e2", "Founding of House Vyr", "2nd_age.year_85", "2nd_age", "valdorn"),
("e3", "Crimson Pact Founded", "2nd_age.year_150", "2nd_age", "mardsville"),
("e4", "Aldric becomes lord", "2nd_age.year_240", "2nd_age", "thornwall"),
("e5", "The Mardsville Heist", "2nd_age.year_265", "2nd_age", "mardsville"),
("e6", "Crimson Pact attacks Thornwall", "2nd_age.year_280", "2nd_age", "thornwall"),
]
ITEMS = [
# (id, name, kind)
("sword_eventide", "Sword of Eventide", "weapon"),
("pale_ledger", "The Pale Ledger", "document"),
("ruby_eye", "Ruby Eye of Kael", "artifact"),
("silver_locket", "Elara's Locket", "jewelry"),
]
# Time-bounded relations (the interesting ones — not just static)
RELATIONS = [
# (from_kind, from_id, rel, to_kind, to_id, valid_from, valid_until)
("Person", "theron", "PARENT_OF", "Person", "maric", "1st_age.year_50", "2nd_age.year_120"),
("Person", "maric", "PARENT_OF", "Person", "cael", "2nd_age.year_180", None),
("Person", "cael", "PARENT_OF", "Person", "aldric", "2nd_age.year_240", "2nd_age.year_285"),
("Person", "yssa", "PARENT_OF", "Person", "aldric", "2nd_age.year_240", "2nd_age.year_285"),
("Person", "aldric", "SPOUSE_OF", "Person", "elara", "2nd_age.year_250", None),
("Person", "theron", "FOUNDED", "Faction", "house_vyr", "1st_age.year_85", None),
("Person", "maric", "MEMBER_OF", "Faction", "house_vyr", "2nd_age.year_100", "2nd_age.year_160"),
("Person", "aldric", "MEMBER_OF", "Faction", "house_vyr", "2nd_age.year_240", None),
("Person", "aldric", "RULES", "Location","thornwall","2nd_age.year_240", "2nd_age.year_285"),
("Person", "kael", "MEMBER_OF", "Faction", "crimson_pact","2nd_age.year_200", None),
("Faction","crimson_pact","RULES","Location", "mardsville","2nd_age.year_160", "2nd_age.year_232"),
("Faction","house_vyr","ALLIED_WITH","Faction","merchants", "2nd_age.year_100", None),
("Faction","crimson_pact","ENEMY_OF","Faction","house_vyr", "2nd_age.year_150", None),
("Person","aldric","POSSESSES","Item","sword_eventide", "2nd_age.year_245", None),
("Person","elara","POSSESSES","Item","silver_locket", "2nd_age.year_250", None),
("Location","thornwall","PART_OF","Location","valdorn", None, None),
("Location","mardsville","PART_OF","Location","valdorn", None, None),
("Event","e1","PARTICIPATED_IN","Person","aldric", "2nd_age.year_232", "2nd_age.year_232"),
("Event","e1","PARTICIPATED_IN","Person","kael", "2nd_age.year_232", "2nd_age.year_232"),
("Event","e5","PARTICIPATED_IN","Person","vex", "2nd_age.year_265", "2nd_age.year_265"),
("Event","e6","PARTICIPATED_IN","Person","aldric", "2nd_age.year_280", "2nd_age.year_280"),
]
# Lineage group
LINEAGES = [
("house_vyr_bloodline", "House Vyr (bloodline)", "theron"),
]
# Trade log entries (Postgres)
TRADES = [
# (buyer, seller, item, qty, unit, unit_price, in_fiction_time, location, notes)
("aldric", "guildmaster", "pale_ledger", 1, "gp", 500, "2nd_age.year_265", "mardsville", "Aldric bought the Pale Ledger via Vex"),
("elara", "guildmaster", "silver_locket", 1, "gp", 120, "2nd_age.year_255", "mardsville", "Gift for Elara"),
("kael", "guildmaster", "ruby_eye", 1, "gp", 900, "2nd_age.year_270", "mardsville", "Crimson Pact acquisition"),
]
# Images
IMAGES = [
# (image_id, object_key, entity_id, entity_type, caption, tags, era)
("img_aldric_portrait", "characters/aldric_portrait.png", "aldric", "Person",
"Portrait of Aldric Raventhorne, Lord of Thornwall. Middle-aged, dark hair, a scar above the left eye.",
["portrait", "noble", "thornwall"], "2nd_age"),
("img_vex_portrait", "characters/vex_portrait.png", "vex", "Person",
"Vex the Silent, a hooded thief from the alleys of Mardsville. Face mostly in shadow.",
["portrait", "thief", "mardsville"], "2nd_age"),
("img_thornwall", "places/thornwall.png", "thornwall", "Location",
"Thornwall Keep at dawn. The banners of House Vyr fly from the battlements.",
["keep", "house_vyr", "dawn"], "2nd_age"),
("img_battle", "events/battle_of_black_spire.png", "e1", "Event",
"The Battle of Black Spire, where Aldric defeated General Kael. House Vyr's banners hold the ridge.",
["battle", "aldric", "kael", "house_vyr"], "2nd_age"),
]
# ─── helpers ─────────────────────────────────────────────────────────────────
def load_neo4j():
print(f"[neo4j] connecting to {NEO4J_URL}")
d = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USER, NEO4J_PASS))
# wait for neo4j
for i in range(30):
try:
d.verify_connectivity()
return d
except Exception as e:
print(f"[neo4j] not ready ({e}); retry {i}")
time.sleep(2)
raise RuntimeError("neo4j never came up")
def load_postgres():
print(f"[postgres] connecting to {PG_URL}")
for i in range(30):
try:
return psycopg2.connect(PG_URL)
except Exception as e:
print(f"[postgres] not ready ({e}); retry {i}")
time.sleep(2)
raise RuntimeError("postgres never came up")
def load_minio():
print(f"[minio] connecting to {MINIO_URL}")
for i in range(30):
try:
c = Minio(MINIO_URL.replace("http://", ""),
access_key=MINIO_USER, secret_key=MINIO_PASS, secure=False)
# Make sure bucket exists
if not c.bucket_exists(MINIO_BUCKET):
c.make_bucket(MINIO_BUCKET)
return c
except Exception as e:
print(f"[minio] not ready ({e}); retry {i}")
time.sleep(2)
raise RuntimeError("minio never came up")
# ─── seeder functions ────────────────────────────────────────────────────────
def seed_neo4j(driver):
with driver.session() as s:
# Constraints
for label in ["Person", "Faction", "Location", "Item", "Event", "Era", "Lineage"]:
s.run(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{label}) REQUIRE n.id IS UNIQUE")
s.run("CREATE CONSTRAINT era_slug IF NOT EXISTS FOR (e:Era) REQUIRE e.slug IS UNIQUE")
# Eras
for slug, name, start, end, parent in ERAS:
s.run("""
MERGE (e:Era {slug: $slug})
SET e.name = $name, e.start = $start, e.end = $end, e.parent_slug = $parent
""", slug=slug, name=name, start=start, end=end, parent=parent)
for slug, _, _, _, parent in ERAS:
if parent:
s.run("""
MATCH (child:Era {slug: $slug}), (parent:Era {slug: $p})
MERGE (child)-[:PART_OF]->(parent)
""", slug=slug, p=parent)
print(f"[neo4j] seeded {len(ERAS)} eras")
# People
for pid, name, born, died, tier, culture in PEOPLE:
s.run("""
MERGE (p:Person {id: $pid})
SET p.name = $name, p.born = $born, p.died = $died,
p.tier = $tier, p.culture = $culture
""", pid=pid, name=name, born=born, died=died, tier=tier, culture=culture)
print(f"[neo4j] seeded {len(PEOPLE)} people")
# Factions
for fid, name, founded, dissolved in FACTIONS:
s.run("""
MERGE (f:Faction {id: $fid})
SET f.name = $name, f.founded = $founded, f.dissolved = $dissolved
""", fid=fid, name=name, founded=founded, dissolved=dissolved)
print(f"[neo4j] seeded {len(FACTIONS)} factions")
# Locations
for lid, name in LOCATIONS:
s.run("MERGE (l:Location {id: $lid}) SET l.name = $name",
lid=lid, name=name)
print(f"[neo4j] seeded {len(LOCATIONS)} locations")
# Items
for iid, name, kind in ITEMS:
s.run("MERGE (i:Item {id: $iid}) SET i.name = $name, i.kind = $kind",
iid=iid, name=name, kind=kind)
print(f"[neo4j] seeded {len(ITEMS)} items")
# Events
for eid, name, when, era_slug, loc_id in EVENTS:
s.run("""
MERGE (e:Event {id: $eid})
SET e.name = $name, e.in_fiction_time = $when
WITH e
MATCH (era:Era {slug: $era_slug})
MERGE (e)-[:OCCURRED_DURING]->(era)
WITH e
MATCH (l:Location {id: $loc_id})
MERGE (e)-[:OCCURRED_AT]->(l)
""", eid=eid, name=name, when=when, era_slug=era_slug, loc_id=loc_id)
print(f"[neo4j] seeded {len(EVENTS)} events")
# Lineages
for lin_id, name, founder in LINEAGES:
s.run("""
MERGE (l:Lineage {id: $lin_id})
SET l.name = $name
WITH l
MATCH (f:Person {id: $founder})
MERGE (l)-[:FOUNDED_BY]->(f)
""", lin_id=lin_id, name=name, founder=founder)
# Add all Vyr-lineage people
for pid, *_ in PEOPLE:
if pid in {"theron", "maric", "cael", "aldric"}:
s.run("""
MATCH (l:Lineage {id: $lin_id}), (p:Person {id: $pid})
MERGE (p)-[:MEMBER_OF]->(l)
""", lin_id=lin_id, pid=pid)
print(f"[neo4j] seeded {len(LINEAGES)} lineages")
# Time-bounded relations
for fk, fid, rel, tk, tid, vf, vu in RELATIONS:
s.run(f"""
MATCH (a {{id: $fid}})
MATCH (b {{id: $tid}})
MERGE (a)-[r:`{rel}`]->(b)
SET r.valid_from = $vf, r.valid_until = $vu
""", fid=fid, tid=tid, vf=vf, vu=vu)
print(f"[neo4j] seeded {len(RELATIONS)} time-bounded relations")
def seed_postgres(conn):
with conn.cursor() as cur:
for buyer, seller, item, qty, unit, price, when, loc, notes in TRADES:
cur.execute("""
INSERT INTO trade_log
(buyer_id, seller_id, item_id, quantity, unit, unit_price, total_price,
location_id, in_fiction_time, notes)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT DO NOTHING
""", (buyer, seller, item, qty, unit, price, qty * price, loc, when, notes))
conn.commit()
print(f"[postgres] seeded {len(TRADES)} trade_log rows")
def make_placeholder_image(text: str, color: tuple) -> Image.Image:
"""Generate a simple 512x768 placeholder image with text on a colored background."""
img = Image.new("RGB", (512, 768), color=color)
d = ImageDraw.Draw(img)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSerif-Bold.ttf", 36)
except Exception:
font = ImageFont.load_default()
# Wrap text roughly
lines = []
words = text.split()
line = ""
for w in words:
if len(line) + len(w) + 1 > 24:
lines.append(line)
line = w
else:
line = (line + " " + w).strip()
if line:
lines.append(line)
y = 280
for ln in lines[:6]:
bbox = d.textbbox((0, 0), ln, font=font)
w = bbox[2] - bbox[0]
d.text(((512 - w) // 2, y), ln, fill=(255, 255, 255), font=font)
y += 60
d.text((20, 720), "lore-engine-poc mock", fill=(180, 180, 180), font=font)
return img
def seed_minio(client, pg_conn):
palette = {
"Person": (60, 40, 90), # purple
"Location": (40, 70, 50), # dark green
"Event": (110, 40, 30), # dark red
"Item": (110, 90, 20), # gold
"Faction": (50, 50, 80), # slate
}
with pg_conn.cursor() as cur:
for image_id, object_key, entity_id, entity_type, caption, tags, era in IMAGES:
# 1. Generate + upload the image bytes
img = make_placeholder_image(caption, palette.get(entity_type, (50, 50, 50)))
tmp = f"/tmp/{image_id}.png"
img.save(tmp, "PNG")
size = Path(tmp).stat().st_size
client.fput_object(MINIO_BUCKET, object_key, tmp, content_type="image/png")
# 2. Register manifest in Postgres
cur.execute("""
INSERT INTO image_manifest
(image_id, object_key, entity_id, entity_type, caption, tags, era, width, height, bytes)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (image_id) DO UPDATE
SET object_key = EXCLUDED.object_key,
caption = EXCLUDED.caption,
tags = EXCLUDED.tags
""", (image_id, object_key, entity_id, entity_type, caption, tags, era,
img.width, img.height, size))
os.unlink(tmp)
pg_conn.commit()
print(f"[minio+postgres] seeded {len(IMAGES)} images")
# ─── main ────────────────────────────────────────────────────────────────────
def main():
driver = load_neo4j()
pg = load_postgres()
minio = load_minio()
seed_neo4j(driver)
seed_postgres(pg)
seed_minio(minio, pg)
pg.close()
driver.close()
print("\n✅ mock world loaded — try the MCP gateway at http://localhost:8765/mcp")
if __name__ == "__main__":
main()

68
test.sh Executable file
View File

@@ -0,0 +1,68 @@
#!/usr/bin/env bash
# lore-engine-poc — end-to-end test
# Calls every tool type and checks for reasonable responses.
# Run with: bash test.sh
set -e
GATEWAY=${GATEWAY:-http://localhost:8765/mcp}
call() {
local name=$1; shift
local args=$1; shift
curl -s -X POST "$GATEWAY" \
-H "Content-Type: application/json" \
-d "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"tools/call\",\"params\":{\"name\":\"$name\",\"arguments\":$args}}" \
| python3 -c "import json,sys; d=json.load(sys.stdin); print(d['result']['content'][0]['text'])"
}
echo "=== 1. entity_context(Aldric Raventhorne) ==="
call entity_context '{"name":"Aldric Raventhorne"}' | python3 -m json.tool | head -8
echo
echo "=== 2. was_true_at(House Vyr allied Merchants Guild @ 2nd_age.year_230) ==="
call was_true_at '{"relation":"ALLIED_WITH","subject":"House Vyr","object":"Merchants Guild","at_time":"2nd_age.year_230"}'
echo
echo "=== 3. was_true_at(Crimson Pact allied House Vyr @ 2nd_age.year_230 — should be false) ==="
call was_true_at '{"relation":"ALLIED_WITH","subject":"Crimson Pact","object":"House Vyr","at_time":"2nd_age.year_230"}'
echo
echo "=== 4. state_at(Aldric Raventhorne @ 2nd_age.year_260) ==="
call state_at '{"entity":"Aldric Raventhorne","at_time":"2nd_age.year_260"}' | python3 -m json.tool | head -10
echo
echo "=== 5. ancestors_of(Aldric Raventhorne, 5 generations) ==="
call ancestors_of '{"person":"Aldric Raventhorne","generations":5}' | python3 -c "import json,sys; print(f'ancestor count: {json.load(sys.stdin)[\"ancestors\"].__len__()}')"
echo
echo "=== 6. lineage_of(Aldric Raventhorne) ==="
call lineage_of '{"person":"Aldric Raventhorne"}' | python3 -c "import json,sys; print(f'lineage: {json.load(sys.stdin)[\"lineage\"]}, members: {len(json.load(open(\"/dev/null\"))) if False else len(json.load(open(\"/dev/null\"))) or \"see above\"}')" 2>/dev/null || call lineage_of '{"person":"Aldric Raventhorne"}'
echo
echo "=== 7. log_trade(new) ==="
call log_trade '{"buyer_id":"aldric","seller_id":"guildmaster","item_id":"sword_eventide","quantity":1,"unit":"gp","unit_price":750,"in_fiction_time":"2nd_age.year_275","location_id":"thornwall","notes":"blacksmith of thornwall"}'
echo
echo "=== 8. market_price(pale_ledger) ==="
call market_price '{"item_id":"pale_ledger"}'
echo
echo "=== 9. recall_images(entity_id=aldric) ==="
IMG=$(call recall_images '{"entity_id":"aldric"}')
echo "$IMG" | python3 -c "import json,sys; d=json.load(sys.stdin); print(f'image count: {d[\"count\"]}'); print('first caption:', d['images'][0]['caption'][:60] if d['images'] else 'none')"
URL=$(echo "$IMG" | python3 -c "import json,sys; d=json.load(sys.stdin); print(d['images'][0]['presigned_url']) if d['images'] else exit(1)")
echo "first image URL: ${URL:0:80}..."
echo
echo "--- fetching the presigned URL ---"
curl -s -o /tmp/aldric_test.png -w "HTTP %{http_code} | size %{size_download} bytes | type %{content_type}\n" "$URL"
file /tmp/aldric_test.png
echo
echo "=== 10. search_images_by_caption(q=aldric) ==="
call search_images_by_caption '{"q":"aldric"}' | python3 -c "import json,sys; d=json.load(sys.stdin); print(f'matches: {d[\"count\"]}'); [print(f' - {img[\"entity_type\"]}:{img[\"entity_id\"]} — {img[\"caption\"][:50]}...') for img in d['images']]"
echo
echo "=== 11. register_image(new) ==="
call register_image '{"image_id":"img_test","object_key":"test/x.png","entity_id":"aldric","entity_type":"Person","caption":"test image","tags":["test"],"era":"2nd_age"}'
echo
echo "✅ all tool types tested"