Files
lore-engine-poc/workers/encounter-processor/main.go
Hermes adbb6f0cce feat(substrate): Phase 1 merge — Redis + 8 Go workers + nsc plugin
Ports the GraphMCP-Example substrate into lore-engine-poc:

- 8 Go workers under workers/ (discord-connector, discord-filter, lore-watcher, ingestion-worker, entity-extractor, lore-extractor, encounter-processor, mcp-server), each with Dockerfile + go.mod

- 3 Go unit-test files (encounter-processor, ingestion-worker, lore-extractor) — other 5 workers rely on integration tests via the live stack

- plugins/nsc.py: thin httpx proxy from gateway to lore-mcp-server:9000, exposes all 11 inherited GraphMCP tools (input schemas verbatim from mcp-server/main.go)

- docker-compose.yml: adds lore-redis + lore-mcp-server + the 7 worker services (lore- prefix to avoid clash with other GraphMCP stacks)

- verify-merge.sh (171 LOC, 7 pass conditions) + docs/VERIFICATION.md

- tests/contract/test_graphmcp_tool_contracts.py (15 tests; skipped when stack is down — TDD pattern, becomes active once docker compose up brings the stack)

- README.md + test.sh updated for the merged service inventory

Leader notes (2026-06-27 03:50):

- Worker self-blocked review-required after 2 runs (run #7 hit 120/120 iteration budget; run #8 staged 40 files and reported shippable).

- Tests are SKIPPED until docker compose up — worker chose that pattern over mocking (consistent with the lore-engine-poc project convention). To activate, run `docker compose up -d --build && pytest tests/contract/`.

- File Scope reconciliation: story said gateway/plugins/nsc/__init__.py; worker shipped plugins/nsc.py (flat file). Justified by the existing plugins/ convention in lore-engine-poc (server.py glob("*.py")). A future PR could split nsc into a package once server.py learns __init__.py discovery.

- nsc plugin exposes 11 tools (not 8) — the AC said "8" but the worker enumerated all 11 tools present in mcp-server/main.go. The encounter-specific 3 tools (list_encounters, search_encounters, get_encounter) were included for consistency. Story AC #2 reads "≥ 8 GraphMCP tools" so this exceeds AC.

Refs: S2-phase-1-substrate-merge, milestone #64 P1 — Substrate merge
2026-06-27 03:48:54 +00:00

531 lines
17 KiB
Go

package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os"
"strings"
"time"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
"github.com/redis/go-redis/v9"
)
var httpClient = &http.Client{Timeout: 30 * time.Second}
// ── Config ────────────────────────────────────────────────────────────────────
type Config struct {
RedisURL string
Stream string
Group string
Consumer string
Neo4jURL string
Neo4jUser string
Neo4jPass string
LLMURL string
LLMModel string
}
func configFromEnv() Config {
return Config{
RedisURL: getEnv("REDIS_URL", "redis://redis:6379"),
Stream: getEnv("REDIS_STREAM", "raw.encounters"),
Group: getEnv("REDIS_GROUP", "encounter-processing"),
Consumer: getEnv("CONSUMER_NAME", "encounter-processor-1"),
Neo4jURL: getEnv("NEO4J_URL", "bolt://neo4j:7687"),
Neo4jUser: getEnv("NEO4J_USER", "neo4j"),
Neo4jPass: getEnv("NEO4J_PASSWORD", "changeme"),
LLMURL: getEnv("LLM_URL", "http://ollama-cpu:11435"),
LLMModel: getEnv("LLM_MODEL", "qwen2.5:3b"),
}
}
// ── Context Logger & Entity Normalizer ────────────────────────────────────────
type contextKey string
const loggerKey contextKey = "logger"
func contextWithLogger(ctx context.Context, l *slog.Logger) context.Context {
return context.WithValue(ctx, loggerKey, l)
}
func loggerFromContext(ctx context.Context) *slog.Logger {
if l, ok := ctx.Value(loggerKey).(*slog.Logger); ok {
return l
}
return slog.Default()
}
func normalizeEntityType(t string) (string, bool) {
switch strings.ToLower(strings.TrimSpace(t)) {
case "person", "people", "npc", "deity", "character":
return "Person", true
case "location", "locations", "place", "dungeon", "city", "region", "landmark":
return "Location", true
case "event", "events", "battle", "ceremony", "occurrence":
return "Event", true
case "faction", "factions", "guild", "kingdom", "order", "group":
return "Faction", true
case "item", "items", "weapon", "artifact", "magical item":
return "Item", true
case "creature", "creatures", "monster", "beast":
return "Creature", true
default:
return "", false
}
}
// ── System prompt ─────────────────────────────────────────────────────────────
const encounterSystemPrompt = `You are a D&D entity extraction engine. Given an encounter summary, extract named entities from the game world that were discussed, discovered, or involved.
Return ONLY valid JSON in this exact shape, no other text:
{
"entities": [
{"name": "Iron Council", "type": "Faction"},
{"name": "Mardsville", "type": "Location"}
],
"relations": []
}
Entity types (use exactly these labels):
Person — a named character, NPC, or deity
Location — a named place, dungeon, city, region, or landmark
Event — a named battle, ceremony, or significant occurrence
Faction — a named guild, kingdom, order, or group
Item — a named weapon, artifact, or magical item
Creature — a named or typed monster or beast
Rules:
- Only extract entities explicitly named in the summary.
- Return {"entities": [], "relations": []} if no named entities are found.
- Do not invent names not present in the text.`
// ── LLM entity extraction ─────────────────────────────────────────────────────
type chatMessage struct {
Role string `json:"role"`
Content string `json:"content"`
}
type chatRequest struct {
Model string `json:"model"`
Messages []chatMessage `json:"messages"`
Stream bool `json:"stream"`
}
type chatResponse struct {
Choices []struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
} `json:"choices"`
}
type Entity struct {
Name string `json:"name"`
Type string `json:"type"`
}
type ExtractionResult struct {
Entities []Entity `json:"entities"`
}
func extractEntities(ctx context.Context, cfg Config, summary string) (*ExtractionResult, error) {
logger := loggerFromContext(ctx)
payload := chatRequest{
Model: cfg.LLMModel,
Messages: []chatMessage{
{Role: "system", Content: encounterSystemPrompt},
{Role: "user", Content: summary},
},
Stream: false,
}
body, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("marshal extract request: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
cfg.LLMURL+"/v1/chat/completions", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create extract HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("execute extract HTTP request: %w", err)
}
defer resp.Body.Close()
var cr chatResponse
if err := json.NewDecoder(resp.Body).Decode(&cr); err != nil {
return nil, fmt.Errorf("decode extract response: %w", err)
}
if len(cr.Choices) == 0 {
return nil, fmt.Errorf("empty LLM response")
}
raw := cr.Choices[0].Message.Content
raw = strings.TrimPrefix(strings.TrimSpace(raw), "```json")
raw = strings.TrimPrefix(raw, "```")
raw = strings.TrimSuffix(raw, "```")
raw = strings.TrimSpace(raw)
var result ExtractionResult
if err := json.Unmarshal([]byte(raw), &result); err != nil {
logger.Warn("LLM returned non-JSON", "err", err, "raw", cr.Choices[0].Message.Content)
return &ExtractionResult{}, nil
}
return &result, nil
}
// ── Neo4j write ───────────────────────────────────────────────────────────────
const mergeEncounter = `
MERGE (enc:Encounter {id: $id})
ON CREATE SET
enc.title = $title,
enc.type = $type,
enc.location_name = $location,
enc.timestamp = $timestamp,
enc.summary = $summary
`
// resolveEntityQuery finds a canonical lore entity by exact name or alias.
// Returns the canonical name if a lore-verified match exists.
const resolveEntityQuery = `
MATCH (e)
WHERE (e.name = $name OR $name IN coalesce(e.aliases, []))
AND e.lore_verified = true
AND any(lbl IN labels(e) WHERE lbl IN ['Person','Location','Faction','Item','Creature','Event'])
RETURN e.name AS canonical LIMIT 1
`
// resolveEntity looks up a canonical entity by name or alias.
// Returns the canonical name on a hit, empty string on a miss.
func resolveEntity(ctx context.Context, session neo4j.SessionWithContext, name string) string {
result, err := session.ExecuteRead(ctx, func(tx neo4j.ManagedTransaction) (any, error) {
res, err := tx.Run(ctx, resolveEntityQuery, map[string]any{"name": name})
if err != nil {
return "", err
}
if res.Next(ctx) {
v, ok := res.Record().Get("canonical")
if ok {
if s, ok := v.(string); ok {
return s, nil
}
}
}
return "", res.Err()
})
if err != nil || result == nil {
return ""
}
s, ok := result.(string)
if !ok {
return ""
}
return s
}
// parseParticipants splits a comma-separated participant string and trims each name.
func parseParticipants(s string) []string {
var out []string
for _, name := range strings.Split(s, ",") {
if name = strings.TrimSpace(name); name != "" {
out = append(out, name)
}
}
return out
}
const mergeWitnessedCanonical = `
MATCH (e {name: $canonical})
MERGE (enc:Encounter {id: $encID})
MERGE (e)-[w:WITNESSED]->(enc)
ON CREATE SET w.at = $timestamp
`
const mergeWitnessedProvisional = `
MERGE (p:Person {name: $name})
ON CREATE SET p.lore_verified = false, p.source = "encounter"
MERGE (enc:Encounter {id: $encID})
MERGE (p)-[w:WITNESSED]->(enc)
ON CREATE SET w.at = $timestamp
`
const mergeLocationCanonical = `
MATCH (loc {name: $canonical})
MATCH (enc:Encounter {id: $encID})
MERGE (enc)-[:OCCURRED_AT]->(loc)
`
const mergeLocationProvisional = `
MERGE (loc:Location {name: $location})
ON CREATE SET loc.lore_verified = false, loc.source = "encounter"
MATCH (enc:Encounter {id: $encID})
MERGE (enc)-[:OCCURRED_AT]->(loc)
`
// mergeEncounterEntities links LLM-extracted entities to the encounter via FEATURED.
// apoc.create.addLabels stamps the correct type label (Person, Location, etc.)
// The WHERE guard excludes infrastructure node types that also carry a name property
// (e.g. Topic) from being accidentally matched and re-labelled.
const mergeEncounterEntities = `
MATCH (enc:Encounter {id: $encID})
WITH enc
UNWIND $entities AS ent
MERGE (e {name: ent.name})
ON CREATE SET e.type = ent.type, e.source = "encounter", e.lore_verified = false
WITH enc, e, ent
WHERE NOT (e:LoreDocument OR e:LoreChunk OR e:Chunk OR e:Message OR e:Encounter)
CALL apoc.create.addLabels(e, [ent.type]) YIELD node
MERGE (enc)-[:FEATURED]->(node)
`
func writeToGraph(ctx context.Context, session neo4j.SessionWithContext,
encID, title, encType, location, timestamp, summary string,
participants []string, entities []map[string]any) error {
logger := loggerFromContext(ctx)
// 1. Create Encounter node
_, err := session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (any, error) {
_, err := tx.Run(ctx, mergeEncounter, map[string]any{
"id": encID, "title": title, "type": encType,
"location": location, "timestamp": timestamp, "summary": summary,
})
return nil, err
})
if err != nil {
return fmt.Errorf("merge encounter: %w", err)
}
// 2. Link location — resolve to canonical lore entity if possible
if strings.TrimSpace(location) != "" {
canonical := resolveEntity(ctx, session, location)
_, err = session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (any, error) {
if canonical != "" {
_, err := tx.Run(ctx, mergeLocationCanonical, map[string]any{
"canonical": canonical, "encID": encID,
})
return nil, err
}
_, err := tx.Run(ctx, mergeLocationProvisional, map[string]any{
"location": location, "encID": encID,
})
return nil, err
})
if err != nil {
logger.Warn("merge encounter location failed", "enc_id", encID, "location", location, "err", err)
} else if canonical != "" {
logger.Info("location resolved to canonical", "raw", location, "canonical", canonical)
} else {
logger.Info("location created as provisional", "location", location)
}
}
// 3. WITNESSED edges — resolve each participant to canonical lore entity
for _, name := range participants {
if name == "" {
continue
}
canonical := resolveEntity(ctx, session, name)
_, err := session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (any, error) {
if canonical != "" {
_, err := tx.Run(ctx, mergeWitnessedCanonical, map[string]any{
"canonical": canonical, "encID": encID, "timestamp": timestamp,
})
return nil, err
}
_, err := tx.Run(ctx, mergeWitnessedProvisional, map[string]any{
"name": name, "encID": encID, "timestamp": timestamp,
})
return nil, err
})
if err != nil {
logger.Warn("merge participant failed", "name", name, "enc_id", encID, "err", err)
} else if canonical != "" {
logger.Info("participant resolved to canonical", "raw", name, "canonical", canonical)
} else {
logger.Info("participant created as provisional", "name", name)
}
}
// 4. FEATURED edges for LLM-extracted entities
if len(entities) > 0 {
_, err = session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (any, error) {
_, err := tx.Run(ctx, mergeEncounterEntities, map[string]any{
"encID": encID, "entities": entities,
})
return nil, err
})
if err != nil {
logger.Warn("merge encounter entities failed", "enc_id", encID, "err", err)
}
}
return nil
}
// ── Main ──────────────────────────────────────────────────────────────────────
func main() {
cfg := configFromEnv()
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
slog.SetDefault(logger)
ctx := context.Background()
ctx = contextWithLogger(ctx, logger)
rOpts, err := redis.ParseURL(cfg.RedisURL)
if err != nil {
logger.Error("invalid redis URL", "err", err)
os.Exit(1)
}
rdb := redis.NewClient(rOpts)
if err := rdb.XGroupCreateMkStream(ctx, cfg.Stream, cfg.Group, "0").Err(); err != nil {
if !strings.Contains(err.Error(), "BUSYGROUP") {
logger.Warn("failed to create redis stream group", "err", err)
}
}
driver, err := neo4j.NewDriverWithContext(cfg.Neo4jURL,
neo4j.BasicAuth(cfg.Neo4jUser, cfg.Neo4jPass, ""))
if err != nil {
logger.Error("neo4j driver error", "err", err)
os.Exit(1)
}
defer driver.Close(ctx)
logger.Info("encounter-processor started", "stream", cfg.Stream, "group", cfg.Group)
// Reclaim any messages delivered but not ACK'd before last shutdown.
// Bounded to maxRecoveryPasses so a persistently failing message
// (e.g. LLM not yet ready) does not block the live loop on startup.
const maxRecoveryPasses = 5
for pass := 0; pass < maxRecoveryPasses; pass++ {
results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: cfg.Group,
Consumer: cfg.Consumer,
Streams: []string{cfg.Stream, "0"},
Count: 5,
}).Result()
if err != nil || len(results) == 0 || len(results[0].Messages) == 0 {
break
}
for _, msg := range results[0].Messages {
logger.Info("reprocessing pending message", "id", msg.ID)
if err := processMessage(ctx, cfg, driver, msg); err != nil {
logger.Error("encounter processing failed (pending)", "id", msg.ID, "err", err)
continue
}
if err := rdb.XAck(ctx, cfg.Stream, cfg.Group, msg.ID).Err(); err != nil {
logger.Error("failed to acknowledge pending message", "id", msg.ID, "err", err)
}
}
}
for {
results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: cfg.Group,
Consumer: cfg.Consumer,
Streams: []string{cfg.Stream, ">"},
Count: 5,
Block: 5 * time.Second,
}).Result()
if err == redis.Nil {
continue
}
if err != nil {
logger.Error("redis read error", "err", err)
time.Sleep(2 * time.Second)
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
if err := processMessage(ctx, cfg, driver, msg); err != nil {
logger.Error("encounter processing failed", "id", msg.ID, "err", err)
continue
}
if err := rdb.XAck(ctx, cfg.Stream, cfg.Group, msg.ID).Err(); err != nil {
logger.Error("failed to acknowledge message", "id", msg.ID, "err", err)
}
}
}
}
}
func processMessage(ctx context.Context, cfg Config, driver neo4j.DriverWithContext,
msg redis.XMessage) error {
logger := loggerFromContext(ctx)
vals := msg.Values
encID := strVal(vals, "id", msg.ID)
title := strVal(vals, "title", "Unnamed Encounter")
encType := strVal(vals, "type", "conversation")
location := strVal(vals, "location", "")
participantStr := strVal(vals, "participants", "")
summary := strVal(vals, "summary", "")
timestamp := strVal(vals, "timestamp", time.Now().UTC().Format(time.RFC3339))
participants := parseParticipants(participantStr)
// LLM extraction on summary → FEATURED entity list
var entities []map[string]any
if strings.TrimSpace(summary) != "" {
result, err := extractEntities(ctx, cfg, summary)
if err != nil {
logger.Warn("entity extraction failed", "enc_id", encID, "err", err)
} else {
for _, e := range result.Entities {
if normType, ok := normalizeEntityType(e.Type); ok {
entities = append(entities, map[string]any{"name": e.Name, "type": normType})
} else {
logger.Warn("skipping unsupported entity type", "enc_id", encID, "name", e.Name, "type", e.Type)
}
}
}
}
session := driver.NewSession(ctx, neo4j.SessionConfig{})
defer session.Close(ctx)
if err := writeToGraph(ctx, session, encID, title, encType, location,
timestamp, summary, participants, entities); err != nil {
return fmt.Errorf("write encounter to graph: %w", err)
}
logger.Info("processed encounter", "id", encID, "title", title,
"participants", participantStr, "featured_entities", len(entities))
return nil
}
// ── Helpers ───────────────────────────────────────────────────────────────────
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func strVal(m map[string]any, key, fallback string) string {
if v, ok := m[key]; ok {
if s, ok := v.(string); ok {
return s
}
}
return fallback
}