Every read tool (entity_context, state_at, was_true_at, ancestors_of,
descendants_of, lineage_of, recall_images, search_images_by_caption,
register_image) now accepts an optional world_id parameter, defaulting
to 'default' so v1 callers see no change.
* Neo4j: every Person/Faction/Location/Item/Event/Lineage/Image node
carries a world_id string property. seed.py backfills existing nodes
to 'default' and writes a second minimal world 'arda_greyscale'
(2 people, 1 faction, 1 location, 4 relations, 1 event, 1 era).
* Cypher: every MATCH in the world/lineage/images plugins filters by
world_id on the right node type.
* New admin tool list_worlds() — runs
MATCH (n) WHERE n.world_id IS NOT NULL RETURN DISTINCT n.world_id
to expose the namespace.
* Postgres image_manifest gains a world_id column (init.sql).
* 4 placeholder images generated for the greyscale world (portraits of
Mael and Sira, Ashen Hall, the Ashen Oath).
* test.sh now calls every tool with world_id='default' to verify v1
behaviour, plus a new section 12 that calls list_worlds().
* tests/test_multi_world.py: 14 pytest cases covering list_worlds,
entity_context/world isolation, was_true_at, state_at, lineage
filter, image recall isolation, and the image_manifest schema.
Verification:
pytest tests/test_multi_world.py 14/14 pass
bash test.sh 12/12 sections green,
MinIO image bytes flow 200 OK
list_worlds() returns [{arda_greyscale}, {default}]
272 lines
11 KiB
Python
272 lines
11 KiB
Python
"""
|
|
images plugin — MinIO-backed image recall with Neo4j entity links.
|
|
|
|
Demonstrates the "different DB for different purpose" pattern:
|
|
- Postgres holds image MANIFESTS (metadata, tags, captions) so the LLM
|
|
can decide which images to surface.
|
|
- MinIO holds the actual BYTES (PNGs, JPEGs).
|
|
- Neo4j holds the LINK from an image to the entity it depicts.
|
|
|
|
The LLM calls recall_images(entity=...) to get back a list of
|
|
{image_id, caption, object_key, presigned_url} so it can either describe
|
|
the image (from caption) or fetch the bytes (from the presigned URL).
|
|
|
|
Per docs/01-ontology.md, every row carries a `world_id` namespace and
|
|
all reads filter by it. The default is "default".
|
|
"""
|
|
import datetime as dt
|
|
import logging
|
|
import os
|
|
import threading
|
|
from urllib.parse import urlparse, urlunparse
|
|
|
|
from server import get_postgres, get_neo4j, get_minio, REGISTRY
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
DEFAULT_WORLD = "default"
|
|
|
|
# Module-level state for the background-embedding hook. We only start
|
|
# the thread once per gateway process; subsequent register_image calls
|
|
# reuse it.
|
|
_embed_thread_started = False
|
|
_embed_thread_lock = threading.Lock()
|
|
|
|
|
|
def _start_embed_worker_once():
|
|
"""Spawn a single daemon thread that watches for new embeddings.
|
|
Imported lazily so that `from server import ...` failures during
|
|
plugin import don't crash the gateway."""
|
|
global _embed_thread_started
|
|
with _embed_thread_lock:
|
|
if _embed_thread_started:
|
|
return
|
|
# The embedding plugin is auto-loaded after this one (alphabetical:
|
|
# embeddings.py < images.py in reversed order — actually images.py
|
|
# comes first). We can't import at module top, so do it here.
|
|
def _worker():
|
|
import time
|
|
from plugins.embeddings import _do_embed_images
|
|
while True:
|
|
try:
|
|
n = _do_embed_images(limit=50)
|
|
if n:
|
|
LOG.info(f"embed_worker: wrote {n} new embeddings")
|
|
except Exception as e:
|
|
LOG.exception(f"embed_worker: {e}")
|
|
time.sleep(2)
|
|
t = threading.Thread(target=_worker, name="embed-worker", daemon=True)
|
|
t.start()
|
|
_embed_thread_started = True
|
|
LOG.info("started embed_worker background thread")
|
|
|
|
|
|
def _world(args):
|
|
return args.get("world_id") or DEFAULT_WORLD
|
|
|
|
|
|
def _q_neo4j(query, params=None):
|
|
driver = get_neo4j()
|
|
with driver.session() as s:
|
|
return [dict(r) for r in s.run(query, params or {})]
|
|
|
|
|
|
def _q_pg(sql, params=None, fetch=True, commit=False):
|
|
conn = get_postgres()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, params or ())
|
|
if commit:
|
|
conn.commit()
|
|
if fetch and cur.description:
|
|
cols = [d[0] for d in cur.description]
|
|
return [dict(zip(cols, r)) for r in cur.fetchall()]
|
|
return []
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _presign(object_key: str) -> str:
|
|
"""
|
|
Return a presigned MinIO URL the caller can fetch bytes from.
|
|
|
|
Tricky bit: we always sign against the *internal* MinIO endpoint (the
|
|
one this container can reach), but the URL we return is rewritten to
|
|
use the *public* endpoint (the one the client will actually hit). This
|
|
works because AWS-style signatures in MinIO are computed over the
|
|
canonical request including the *Host header*, not the hostname in the
|
|
URL. We set the Host header explicitly to the public host in the
|
|
SigV4 signing step.
|
|
"""
|
|
import datetime as dt
|
|
from botocore.config import Config
|
|
import boto3
|
|
public = os.environ.get("MINIO_PUBLIC_URL", os.environ["MINIO_URL"])
|
|
internal = os.environ["MINIO_URL"]
|
|
bucket = os.environ["MINIO_BUCKET"]
|
|
if public == internal:
|
|
# Easy path: same endpoint
|
|
return get_minio().presigned_get_object(
|
|
bucket, object_key, expires=dt.timedelta(hours=1)
|
|
)
|
|
# Sign against the public endpoint directly so the signature
|
|
# matches the URL we're handing out.
|
|
parsed = urlparse(public)
|
|
s3 = boto3.client(
|
|
"s3",
|
|
endpoint_url=f"{parsed.scheme}://{parsed.hostname}:{parsed.port or 80}",
|
|
aws_access_key_id=os.environ["MINIO_ACCESS_KEY"],
|
|
aws_secret_access_key=os.environ["MINIO_SECRET_KEY"],
|
|
region_name="us-east-1",
|
|
config=Config(signature_version="s3v4"),
|
|
)
|
|
return s3.generate_presigned_url(
|
|
"get_object",
|
|
Params={"Bucket": bucket, "Key": object_key},
|
|
ExpiresIn=3600,
|
|
)
|
|
|
|
|
|
@REGISTRY.tool(
|
|
name="register_image",
|
|
description="Register an image in the manifest. Idempotent on (image_id). The object must already be in MinIO at the given object_key.",
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"image_id": {"type": "string", "description": "Caller-chosen unique id"},
|
|
"object_key": {"type": "string", "description": "MinIO object key, e.g. 'characters/aldric.png'"},
|
|
"entity_id": {"type": "string", "description": "Linked Neo4j entity id (Person.id, Location.id, etc.)"},
|
|
"entity_type": {"type": "string", "enum": ["Person", "Faction", "Location", "Item", "Event"]},
|
|
"caption": {"type": "string", "description": "1-3 sentences describing the image for the LLM"},
|
|
"tags": {"type": "array", "items": {"type": "string"}},
|
|
"era": {"type": "string", "description": "Canonical era slug, e.g. '2nd_age'"},
|
|
"world_id": {"type": "string", "default": DEFAULT_WORLD,
|
|
"description": "World namespace; defaults to 'default'"},
|
|
"width": {"type": "integer"},
|
|
"height": {"type": "integer"},
|
|
"bytes": {"type": "integer"},
|
|
},
|
|
"required": ["image_id", "object_key", "caption"],
|
|
},
|
|
)
|
|
def register_image(args):
|
|
world_id = _world(args)
|
|
_q_pg("""
|
|
INSERT INTO image_manifest
|
|
(image_id, world_id, object_key, entity_id, entity_type, caption, tags, era, width, height, bytes)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (image_id) DO UPDATE
|
|
SET world_id = EXCLUDED.world_id,
|
|
object_key = EXCLUDED.object_key,
|
|
entity_id = EXCLUDED.entity_id,
|
|
caption = EXCLUDED.caption,
|
|
tags = EXCLUDED.tags,
|
|
era = EXCLUDED.era
|
|
""", (
|
|
args["image_id"], world_id, args["object_key"], args.get("entity_id"),
|
|
args.get("entity_type"), args["caption"], args.get("tags", []),
|
|
args.get("era"), args.get("width"), args.get("height"), args.get("bytes"),
|
|
), fetch=False, commit=True)
|
|
# Link in Neo4j so entity_context can see "this image depicts X".
|
|
# The Image node is also namespaced by world_id.
|
|
if args.get("entity_id") and args.get("entity_type"):
|
|
_q_neo4j("""
|
|
MATCH (e {id: $entity_id, world_id: $world_id})
|
|
MERGE (img:Image {id: $image_id, world_id: $world_id})
|
|
ON CREATE SET img.caption = $caption, img.era = $era
|
|
MERGE (img)-[:DEPICTS]->(e)
|
|
""", {
|
|
"entity_id": args["entity_id"], "image_id": args["image_id"],
|
|
"world_id": world_id,
|
|
"caption": args["caption"], "era": args.get("era"),
|
|
})
|
|
# Kick off (or wake up) the background embed worker so the new image
|
|
# is searchable by `search_images_semantic` within a few seconds.
|
|
_start_embed_worker_once()
|
|
return {"registered": True, "image_id": args["image_id"], "world_id": world_id}
|
|
|
|
|
|
@REGISTRY.tool(
|
|
name="recall_images",
|
|
description="Recall images for an entity in a given world. Returns a list of {image_id, caption, tags, era, presigned_url}.",
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"entity_id": {"type": "string", "description": "Person.id / Location.id / etc."},
|
|
"tag": {"type": "string", "description": "Optional tag filter (e.g. 'portrait', 'battle')"},
|
|
"limit": {"type": "integer", "default": 5},
|
|
"world_id": {"type": "string", "default": DEFAULT_WORLD},
|
|
},
|
|
"required": ["entity_id"],
|
|
},
|
|
)
|
|
def recall_images(args):
|
|
world_id = _world(args)
|
|
if args.get("tag"):
|
|
rows = _q_pg("""
|
|
SELECT image_id, world_id, caption, tags, era, object_key
|
|
FROM image_manifest
|
|
WHERE entity_id = %s AND world_id = %s AND %s = ANY(tags)
|
|
ORDER BY uploaded_at DESC LIMIT %s
|
|
""", (args["entity_id"], world_id, args["tag"], args.get("limit", 5)))
|
|
else:
|
|
rows = _q_pg("""
|
|
SELECT image_id, world_id, caption, tags, era, object_key
|
|
FROM image_manifest
|
|
WHERE entity_id = %s AND world_id = %s
|
|
ORDER BY uploaded_at DESC LIMIT %s
|
|
""", (args["entity_id"], world_id, args.get("limit", 5)))
|
|
out = []
|
|
for r in rows:
|
|
out.append({
|
|
"image_id": r["image_id"],
|
|
"world_id": r["world_id"],
|
|
"caption": r["caption"],
|
|
"tags": r["tags"],
|
|
"era": r["era"],
|
|
"presigned_url": _presign(r["object_key"]),
|
|
})
|
|
return {"entity_id": args["entity_id"], "world_id": world_id, "count": len(out), "images": out}
|
|
|
|
|
|
@REGISTRY.tool(
|
|
name="search_images_by_caption",
|
|
description="Find images whose caption or tags contain a substring, in a given world. Use this when the LLM doesn't know the exact entity id.",
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"q": {"type": "string", "description": "Substring to search for in caption or tags"},
|
|
"limit": {"type": "integer", "default": 5},
|
|
"world_id": {"type": "string", "default": DEFAULT_WORLD},
|
|
},
|
|
"required": ["q"],
|
|
},
|
|
)
|
|
def search_images_by_caption(args):
|
|
world_id = _world(args)
|
|
like = f"%{args['q']}%"
|
|
rows = _q_pg("""
|
|
SELECT image_id, world_id, entity_id, entity_type, caption, tags, era, object_key
|
|
FROM image_manifest
|
|
WHERE world_id = %s
|
|
AND (caption ILIKE %s OR EXISTS (SELECT 1 FROM unnest(tags) tag WHERE tag ILIKE %s))
|
|
ORDER BY uploaded_at DESC LIMIT %s
|
|
""", (world_id, like, like, args.get("limit", 5)))
|
|
out = []
|
|
for r in rows:
|
|
out.append({
|
|
"image_id": r["image_id"],
|
|
"world_id": r["world_id"],
|
|
"entity_id": r["entity_id"],
|
|
"entity_type": r["entity_type"],
|
|
"caption": r["caption"],
|
|
"tags": r["tags"],
|
|
"era": r["era"],
|
|
"presigned_url": _presign(r["object_key"]),
|
|
})
|
|
return {"q": args["q"], "world_id": world_id, "count": len(out), "images": out}
|
|
|
|
|
|
def register(registry):
|
|
pass
|