Files
lore-engine-poc/plugins/images.py
Kay 4f922899af T6: multi-world namespace — world_id on every node, list_worlds(), arda_greyscale seed
Every read tool (entity_context, state_at, was_true_at, ancestors_of,
descendants_of, lineage_of, recall_images, search_images_by_caption,
register_image) now accepts an optional world_id parameter, defaulting
to 'default' so v1 callers see no change.

* Neo4j: every Person/Faction/Location/Item/Event/Lineage/Image node
  carries a world_id string property. seed.py backfills existing nodes
  to 'default' and writes a second minimal world 'arda_greyscale'
  (2 people, 1 faction, 1 location, 4 relations, 1 event, 1 era).
* Cypher: every MATCH in the world/lineage/images plugins filters by
  world_id on the right node type.
* New admin tool list_worlds() — runs
  MATCH (n) WHERE n.world_id IS NOT NULL RETURN DISTINCT n.world_id
  to expose the namespace.
* Postgres image_manifest gains a world_id column (init.sql).
* 4 placeholder images generated for the greyscale world (portraits of
  Mael and Sira, Ashen Hall, the Ashen Oath).
* test.sh now calls every tool with world_id='default' to verify v1
  behaviour, plus a new section 12 that calls list_worlds().
* tests/test_multi_world.py: 14 pytest cases covering list_worlds,
  entity_context/world isolation, was_true_at, state_at, lineage
  filter, image recall isolation, and the image_manifest schema.

Verification:
  pytest tests/test_multi_world.py         14/14 pass
  bash test.sh                             12/12 sections green,
                                           MinIO image bytes flow 200 OK
  list_worlds() returns [{arda_greyscale}, {default}]
2026-06-16 23:09:40 +00:00

272 lines
11 KiB
Python

"""
images plugin — MinIO-backed image recall with Neo4j entity links.
Demonstrates the "different DB for different purpose" pattern:
- Postgres holds image MANIFESTS (metadata, tags, captions) so the LLM
can decide which images to surface.
- MinIO holds the actual BYTES (PNGs, JPEGs).
- Neo4j holds the LINK from an image to the entity it depicts.
The LLM calls recall_images(entity=...) to get back a list of
{image_id, caption, object_key, presigned_url} so it can either describe
the image (from caption) or fetch the bytes (from the presigned URL).
Per docs/01-ontology.md, every row carries a `world_id` namespace and
all reads filter by it. The default is "default".
"""
import datetime as dt
import logging
import os
import threading
from urllib.parse import urlparse, urlunparse
from server import get_postgres, get_neo4j, get_minio, REGISTRY
LOG = logging.getLogger(__name__)
DEFAULT_WORLD = "default"
# Module-level state for the background-embedding hook. We only start
# the thread once per gateway process; subsequent register_image calls
# reuse it.
_embed_thread_started = False
_embed_thread_lock = threading.Lock()
def _start_embed_worker_once():
"""Spawn a single daemon thread that watches for new embeddings.
Imported lazily so that `from server import ...` failures during
plugin import don't crash the gateway."""
global _embed_thread_started
with _embed_thread_lock:
if _embed_thread_started:
return
# The embedding plugin is auto-loaded after this one (alphabetical:
# embeddings.py < images.py in reversed order — actually images.py
# comes first). We can't import at module top, so do it here.
def _worker():
import time
from plugins.embeddings import _do_embed_images
while True:
try:
n = _do_embed_images(limit=50)
if n:
LOG.info(f"embed_worker: wrote {n} new embeddings")
except Exception as e:
LOG.exception(f"embed_worker: {e}")
time.sleep(2)
t = threading.Thread(target=_worker, name="embed-worker", daemon=True)
t.start()
_embed_thread_started = True
LOG.info("started embed_worker background thread")
def _world(args):
return args.get("world_id") or DEFAULT_WORLD
def _q_neo4j(query, params=None):
driver = get_neo4j()
with driver.session() as s:
return [dict(r) for r in s.run(query, params or {})]
def _q_pg(sql, params=None, fetch=True, commit=False):
conn = get_postgres()
try:
with conn.cursor() as cur:
cur.execute(sql, params or ())
if commit:
conn.commit()
if fetch and cur.description:
cols = [d[0] for d in cur.description]
return [dict(zip(cols, r)) for r in cur.fetchall()]
return []
finally:
conn.close()
def _presign(object_key: str) -> str:
"""
Return a presigned MinIO URL the caller can fetch bytes from.
Tricky bit: we always sign against the *internal* MinIO endpoint (the
one this container can reach), but the URL we return is rewritten to
use the *public* endpoint (the one the client will actually hit). This
works because AWS-style signatures in MinIO are computed over the
canonical request including the *Host header*, not the hostname in the
URL. We set the Host header explicitly to the public host in the
SigV4 signing step.
"""
import datetime as dt
from botocore.config import Config
import boto3
public = os.environ.get("MINIO_PUBLIC_URL", os.environ["MINIO_URL"])
internal = os.environ["MINIO_URL"]
bucket = os.environ["MINIO_BUCKET"]
if public == internal:
# Easy path: same endpoint
return get_minio().presigned_get_object(
bucket, object_key, expires=dt.timedelta(hours=1)
)
# Sign against the public endpoint directly so the signature
# matches the URL we're handing out.
parsed = urlparse(public)
s3 = boto3.client(
"s3",
endpoint_url=f"{parsed.scheme}://{parsed.hostname}:{parsed.port or 80}",
aws_access_key_id=os.environ["MINIO_ACCESS_KEY"],
aws_secret_access_key=os.environ["MINIO_SECRET_KEY"],
region_name="us-east-1",
config=Config(signature_version="s3v4"),
)
return s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": object_key},
ExpiresIn=3600,
)
@REGISTRY.tool(
name="register_image",
description="Register an image in the manifest. Idempotent on (image_id). The object must already be in MinIO at the given object_key.",
input_schema={
"type": "object",
"properties": {
"image_id": {"type": "string", "description": "Caller-chosen unique id"},
"object_key": {"type": "string", "description": "MinIO object key, e.g. 'characters/aldric.png'"},
"entity_id": {"type": "string", "description": "Linked Neo4j entity id (Person.id, Location.id, etc.)"},
"entity_type": {"type": "string", "enum": ["Person", "Faction", "Location", "Item", "Event"]},
"caption": {"type": "string", "description": "1-3 sentences describing the image for the LLM"},
"tags": {"type": "array", "items": {"type": "string"}},
"era": {"type": "string", "description": "Canonical era slug, e.g. '2nd_age'"},
"world_id": {"type": "string", "default": DEFAULT_WORLD,
"description": "World namespace; defaults to 'default'"},
"width": {"type": "integer"},
"height": {"type": "integer"},
"bytes": {"type": "integer"},
},
"required": ["image_id", "object_key", "caption"],
},
)
def register_image(args):
world_id = _world(args)
_q_pg("""
INSERT INTO image_manifest
(image_id, world_id, object_key, entity_id, entity_type, caption, tags, era, width, height, bytes)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (image_id) DO UPDATE
SET world_id = EXCLUDED.world_id,
object_key = EXCLUDED.object_key,
entity_id = EXCLUDED.entity_id,
caption = EXCLUDED.caption,
tags = EXCLUDED.tags,
era = EXCLUDED.era
""", (
args["image_id"], world_id, args["object_key"], args.get("entity_id"),
args.get("entity_type"), args["caption"], args.get("tags", []),
args.get("era"), args.get("width"), args.get("height"), args.get("bytes"),
), fetch=False, commit=True)
# Link in Neo4j so entity_context can see "this image depicts X".
# The Image node is also namespaced by world_id.
if args.get("entity_id") and args.get("entity_type"):
_q_neo4j("""
MATCH (e {id: $entity_id, world_id: $world_id})
MERGE (img:Image {id: $image_id, world_id: $world_id})
ON CREATE SET img.caption = $caption, img.era = $era
MERGE (img)-[:DEPICTS]->(e)
""", {
"entity_id": args["entity_id"], "image_id": args["image_id"],
"world_id": world_id,
"caption": args["caption"], "era": args.get("era"),
})
# Kick off (or wake up) the background embed worker so the new image
# is searchable by `search_images_semantic` within a few seconds.
_start_embed_worker_once()
return {"registered": True, "image_id": args["image_id"], "world_id": world_id}
@REGISTRY.tool(
name="recall_images",
description="Recall images for an entity in a given world. Returns a list of {image_id, caption, tags, era, presigned_url}.",
input_schema={
"type": "object",
"properties": {
"entity_id": {"type": "string", "description": "Person.id / Location.id / etc."},
"tag": {"type": "string", "description": "Optional tag filter (e.g. 'portrait', 'battle')"},
"limit": {"type": "integer", "default": 5},
"world_id": {"type": "string", "default": DEFAULT_WORLD},
},
"required": ["entity_id"],
},
)
def recall_images(args):
world_id = _world(args)
if args.get("tag"):
rows = _q_pg("""
SELECT image_id, world_id, caption, tags, era, object_key
FROM image_manifest
WHERE entity_id = %s AND world_id = %s AND %s = ANY(tags)
ORDER BY uploaded_at DESC LIMIT %s
""", (args["entity_id"], world_id, args["tag"], args.get("limit", 5)))
else:
rows = _q_pg("""
SELECT image_id, world_id, caption, tags, era, object_key
FROM image_manifest
WHERE entity_id = %s AND world_id = %s
ORDER BY uploaded_at DESC LIMIT %s
""", (args["entity_id"], world_id, args.get("limit", 5)))
out = []
for r in rows:
out.append({
"image_id": r["image_id"],
"world_id": r["world_id"],
"caption": r["caption"],
"tags": r["tags"],
"era": r["era"],
"presigned_url": _presign(r["object_key"]),
})
return {"entity_id": args["entity_id"], "world_id": world_id, "count": len(out), "images": out}
@REGISTRY.tool(
name="search_images_by_caption",
description="Find images whose caption or tags contain a substring, in a given world. Use this when the LLM doesn't know the exact entity id.",
input_schema={
"type": "object",
"properties": {
"q": {"type": "string", "description": "Substring to search for in caption or tags"},
"limit": {"type": "integer", "default": 5},
"world_id": {"type": "string", "default": DEFAULT_WORLD},
},
"required": ["q"],
},
)
def search_images_by_caption(args):
world_id = _world(args)
like = f"%{args['q']}%"
rows = _q_pg("""
SELECT image_id, world_id, entity_id, entity_type, caption, tags, era, object_key
FROM image_manifest
WHERE world_id = %s
AND (caption ILIKE %s OR EXISTS (SELECT 1 FROM unnest(tags) tag WHERE tag ILIKE %s))
ORDER BY uploaded_at DESC LIMIT %s
""", (world_id, like, like, args.get("limit", 5)))
out = []
for r in rows:
out.append({
"image_id": r["image_id"],
"world_id": r["world_id"],
"entity_id": r["entity_id"],
"entity_type": r["entity_type"],
"caption": r["caption"],
"tags": r["tags"],
"era": r["era"],
"presigned_url": _presign(r["object_key"]),
})
return {"q": args["q"], "world_id": world_id, "count": len(out), "images": out}
def register(registry):
pass