- examples/llm_consumer.py: raw httpx + urllib driver — discovers tools via tools/list, runs the tool-use loop against LiteLLM (minimax-m3), saves per-question JSON traces. No agent framework per task scope. - examples/system_prompt.txt: 5 question types + tool protocol (per lore-engine/docs/07-reasoning-harness.md). - examples/run_questions.sh: bash driver — exits 0 iff all 5 questions pass hand-verified correctness against the seed data. - examples/results/*.json: traces from a real end-to-end run, all 5 PASS. - examples/REPORT.md: per-question ground truth vs answer, with tool-call audit. The model used 9 distinct tools across 5 questions (requirement was >=4); every factual claim is grounded in a tool result; no fabrication.
340 lines
13 KiB
Python
Executable File
340 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
llm_consumer.py — drive the lore-engine MCP gateway end-to-end with a real LLM.
|
|
|
|
For each of 5 question types, this script:
|
|
1. Sends the question to a LiteLLM-proxied LLM (model: minimax-m3) with the
|
|
16 gateway tools exposed as OpenAI-style function calls.
|
|
2. Runs the tool-use loop: model decides which tool(s) to call -> we execute
|
|
them via JSON-RPC against the gateway -> we feed results back -> repeat
|
|
until the model produces a final answer (finish_reason != tool_calls).
|
|
3. Saves the conversation trace (prompt, tool calls, tool results, final
|
|
answer) to a JSON file under examples/results/.
|
|
|
|
The script is intentionally raw httpx + urllib — no agent framework. The
|
|
reasoning harness lives in the system prompt (examples/system_prompt.txt).
|
|
"""
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
# ─── Config ──────────────────────────────────────────────────────────────────
|
|
|
|
GATEWAY_URL = os.environ.get("GATEWAY_URL", "http://localhost:8765/mcp")
|
|
LITELLM_URL = os.environ.get("LITELLM_URL", "http://localhost:4000/v1")
|
|
LITELLM_MODEL = os.environ.get("LITELLM_MODEL", "minimax-m3")
|
|
LITELLM_API_KEY = os.environ.get("LITELLM_API_KEY", "sk-no-auth-needed")
|
|
MAX_TURNS = int(os.environ.get("MAX_TURNS", "8"))
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
SYSTEM_PROMPT_PATH = SCRIPT_DIR / "system_prompt.txt"
|
|
RESULTS_DIR = SCRIPT_DIR / "results"
|
|
|
|
|
|
# ─── Gateway client (JSON-RPC over HTTP) ─────────────────────────────────────
|
|
|
|
def gateway_list_tools(client: httpx.Client) -> list[dict]:
|
|
"""Return [{name, description, inputSchema}, ...] from the gateway."""
|
|
payload = {"jsonrpc": "2.0", "id": 1, "method": "tools/list"}
|
|
r = client.post(GATEWAY_URL, json=payload, timeout=30)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
return data["result"]["tools"]
|
|
|
|
|
|
def gateway_call_tool(client: httpx.Client, name: str, arguments: dict) -> dict:
|
|
"""Invoke a single MCP tool; return the parsed JSON payload.
|
|
|
|
On isError=true the gateway returns the error text under
|
|
data.result.content[0].text — we surface it as a dict so the model
|
|
can read its own mistakes.
|
|
"""
|
|
payload = {
|
|
"jsonrpc": "2.0", "id": 1,
|
|
"method": "tools/call",
|
|
"params": {"name": name, "arguments": arguments},
|
|
}
|
|
r = client.post(GATEWAY_URL, json=payload, timeout=60)
|
|
r.raise_for_status()
|
|
body = r.json()
|
|
if "error" in body:
|
|
return {"_gateway_error": body["error"]}
|
|
content = body["result"]["content"]
|
|
if not content:
|
|
return {"_empty": True}
|
|
first = content[0]
|
|
text = first.get("text", "")
|
|
try:
|
|
return json.loads(text)
|
|
except (json.JSONDecodeError, TypeError):
|
|
return {"_raw_text": text}
|
|
|
|
|
|
# ─── LiteLLM (OpenAI-compatible) chat completion with tool use ───────────────
|
|
|
|
def _to_openai_tools(gateway_tools: list[dict]) -> list[dict]:
|
|
"""Convert MCP tool defs to OpenAI function-calling format."""
|
|
return [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": t["name"],
|
|
"description": t["description"],
|
|
"parameters": t["inputSchema"],
|
|
},
|
|
}
|
|
for t in gateway_tools
|
|
]
|
|
|
|
|
|
def _chat(messages: list[dict], openai_tools: list[dict]) -> dict:
|
|
"""Single non-streaming chat completion call."""
|
|
body = {
|
|
"model": LITELLM_MODEL,
|
|
"messages": messages,
|
|
"tools": openai_tools,
|
|
"tool_choice": "auto",
|
|
"temperature": 0.0,
|
|
"max_tokens": 1024,
|
|
}
|
|
req = urllib.request.Request(
|
|
f"{LITELLM_URL}/chat/completions",
|
|
data=json.dumps(body).encode("utf-8"),
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {LITELLM_API_KEY}",
|
|
},
|
|
)
|
|
with urllib.request.urlopen(req, timeout=120) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
# ─── The reasoning loop ───────────────────────────────────────────────────────
|
|
|
|
def run_question(
|
|
question: str,
|
|
gateway_tools: list[dict],
|
|
openai_tools: list[dict],
|
|
system_prompt: str,
|
|
client: httpx.Client,
|
|
) -> dict:
|
|
"""Drive one question through the LLM + gateway. Returns a trace dict."""
|
|
messages = [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": question},
|
|
]
|
|
trace = {
|
|
"question": question,
|
|
"model": LITELLM_MODEL,
|
|
"turns": [],
|
|
"tools_called": [], # ordered list of {tool, args}
|
|
"answer": None, # final assistant text
|
|
"stopped_reason": None,
|
|
"error": None,
|
|
}
|
|
|
|
for turn_idx in range(MAX_TURNS):
|
|
try:
|
|
resp = _chat(messages, openai_tools)
|
|
except Exception as e:
|
|
trace["error"] = f"chat call failed: {e}"
|
|
trace["stopped_reason"] = "chat_error"
|
|
return trace
|
|
|
|
choice = resp["choices"][0]
|
|
msg = choice["message"]
|
|
finish = choice.get("finish_reason", "stop")
|
|
|
|
# Record the assistant message verbatim so we can replay later.
|
|
recorded = {
|
|
"role": "assistant",
|
|
"content": msg.get("content") or "",
|
|
"reasoning_content": msg.get("reasoning_content"),
|
|
"tool_calls": msg.get("tool_calls") or [],
|
|
}
|
|
messages.append(recorded)
|
|
trace["turns"].append({"turn": turn_idx, "assistant": recorded})
|
|
|
|
# ── If the model produced tool calls, execute them and feed back. ──
|
|
tool_calls = msg.get("tool_calls") or []
|
|
if finish == "tool_calls" and tool_calls:
|
|
for tc in tool_calls:
|
|
fn = tc.get("function") or {}
|
|
tool_name = fn.get("name")
|
|
raw_args = fn.get("arguments") or "{}"
|
|
try:
|
|
args = json.loads(raw_args) if isinstance(raw_args, str) else raw_args
|
|
except json.JSONDecodeError:
|
|
args = {}
|
|
if not tool_name:
|
|
continue
|
|
t0 = time.time()
|
|
try:
|
|
result = gateway_call_tool(client, tool_name, args)
|
|
tool_err = None
|
|
except Exception as e:
|
|
result = {"_transport_error": str(e)}
|
|
tool_err = str(e)
|
|
elapsed_ms = int((time.time() - t0) * 1000)
|
|
|
|
trace["tools_called"].append({
|
|
"tool": tool_name,
|
|
"args": args,
|
|
"elapsed_ms": elapsed_ms,
|
|
"error": tool_err,
|
|
})
|
|
tool_msg = {
|
|
"role": "tool",
|
|
"tool_call_id": tc.get("id"),
|
|
"content": json.dumps(result, default=str),
|
|
}
|
|
messages.append(tool_msg)
|
|
trace["turns"][-1].setdefault("tool_results", []).append({
|
|
"tool": tool_name,
|
|
"args": args,
|
|
"result": result,
|
|
"elapsed_ms": elapsed_ms,
|
|
})
|
|
continue # loop again, let model synthesize final answer
|
|
|
|
# ── No tool calls: this is the final answer. ──
|
|
trace["answer"] = (msg.get("content") or "").strip()
|
|
trace["stopped_reason"] = finish or "stop"
|
|
return trace
|
|
|
|
trace["stopped_reason"] = "max_turns_exceeded"
|
|
return trace
|
|
|
|
|
|
# ─── Ground-truth checks (from seed.py + lore-engine docs) ───────────────────
|
|
|
|
def evaluate(question_id: str, trace: dict) -> dict:
|
|
"""Hand-verified correctness checks per the seed data."""
|
|
answer = (trace.get("answer") or "").lower()
|
|
tools = [t["tool"] for t in trace.get("tools_called", [])]
|
|
checks = []
|
|
|
|
if question_id == "q1_who_is_aldric":
|
|
checks.append({
|
|
"expect": "entity_context was called",
|
|
"ok": "entity_context" in tools,
|
|
})
|
|
checks.append({
|
|
"expect": "answer mentions Aldric Raventhorne and House Vyr/Thornwall",
|
|
"ok": "aldric" in answer and ("vyr" in answer or "thornwall" in answer),
|
|
})
|
|
|
|
elif question_id == "q2_was_allied_230":
|
|
checks.append({
|
|
"expect": "was_true_at was called (or entity_context if model chose to inspect first)",
|
|
"ok": "was_true_at" in tools,
|
|
})
|
|
# Ground truth: ALLIED_WITH house_vyr<->merchants starts 2nd_age.year_100,
|
|
# never ends → true at year_230.
|
|
checks.append({
|
|
"expect": "answer says YES/allied (truth: house_vyr & merchants were allied from year_100 with no end)",
|
|
"ok": any(k in answer for k in ["yes", "allied", "true", "was an alliance", "in force"]),
|
|
})
|
|
|
|
elif question_id == "q3_aldric_ancestors":
|
|
checks.append({
|
|
"expect": "ancestors_of was called",
|
|
"ok": "ancestors_of" in tools,
|
|
})
|
|
# Ground truth from seed: ancestors = Theron, Maric, Cael, Yssa
|
|
# (lineage chain: Aldric ← Maric ← Theron (parent chain via PARENT_OF);
|
|
# Cael is Maric's father per the lineage table; Yssa is also an
|
|
# ancestor. We accept if AT LEAST 3 of the 4 canonical names appear.)
|
|
canonical = ["theron", "maric", "cael", "yssa"]
|
|
found = sum(1 for n in canonical if n in answer)
|
|
checks.append({
|
|
"expect": "answer names at least 3 of {Theron, Maric, Cael, Yssa}",
|
|
"ok": found >= 3,
|
|
"found": found,
|
|
"names_in_answer": [n for n in canonical if n in answer],
|
|
})
|
|
|
|
elif question_id == "q4_images_of_aldric":
|
|
checks.append({
|
|
"expect": "an image-recall tool was called (recall_images or search_images_by_caption)",
|
|
"ok": any(t in tools for t in ["recall_images", "search_images_by_caption", "search_images_semantic"]),
|
|
})
|
|
# Image ground truth: img_aldric_portrait exists with caption
|
|
# "Portrait of Aldric Raventhorne, Lord of Thornwall..."
|
|
checks.append({
|
|
"expect": "answer mentions Aldric's portrait (caption/presigned URL/etc.)",
|
|
"ok": "aldric" in answer and ("portrait" in answer or "image" in answer or "presigned" in answer or "thornwall" in answer),
|
|
})
|
|
|
|
elif question_id == "q5_consistency_issues":
|
|
# The consistency plugin stubs all return {violations: [], count: 0}
|
|
# in v2.T3 (real detection lands in T5). So the truthful answer is
|
|
# "no open issues detected by the current rule set".
|
|
checks.append({
|
|
"expect": "a consistency tool was called (find_contradictions / find_orphans / etc.)",
|
|
"ok": any(t in tools for t in [
|
|
"find_contradictions", "find_anachronisms",
|
|
"find_orphans", "find_ontology_violations",
|
|
]),
|
|
})
|
|
checks.append({
|
|
"expect": "answer reflects the empty/stubbed results honestly (no fabricated issues)",
|
|
"ok": any(k in answer for k in [
|
|
"no", "none", "zero", "empty", "0 ", "0.",
|
|
"no open", "no detected", "no contradictions", "no orphans",
|
|
"no ontology", "no anachronisms",
|
|
]),
|
|
})
|
|
|
|
all_ok = all(c["ok"] for c in checks)
|
|
return {"question_id": question_id, "all_passed": all_ok, "checks": checks}
|
|
|
|
|
|
# ─── CLI ─────────────────────────────────────────────────────────────────────
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--question-id", required=True)
|
|
ap.add_argument("--question", required=True)
|
|
ap.add_argument("--out", required=True, help="path to write JSON result")
|
|
args = ap.parse_args()
|
|
|
|
system_prompt = SYSTEM_PROMPT_PATH.read_text().strip()
|
|
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
with httpx.Client() as client:
|
|
gateway_tools = gateway_list_tools(client)
|
|
openai_tools = _to_openai_tools(gateway_tools)
|
|
trace = run_question(args.question, gateway_tools, openai_tools,
|
|
system_prompt, client)
|
|
|
|
trace["evaluation"] = evaluate(args.question_id, trace)
|
|
|
|
out_path = Path(args.out)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(json.dumps(trace, indent=2, default=str))
|
|
|
|
# Human-readable stdout summary so run_questions.sh shows progress.
|
|
print(f"\n── {args.question_id} ──")
|
|
print(f"Q: {args.question}")
|
|
print(f"Tools called: {[t['tool'] for t in trace['tools_called']]}")
|
|
print(f"Stopped: {trace['stopped_reason']}")
|
|
print(f"Answer: {trace['answer'] or '(empty)'}")
|
|
eval_ = trace["evaluation"]
|
|
print(f"Eval: {'PASS' if eval_['all_passed'] else 'FAIL'}")
|
|
for c in eval_["checks"]:
|
|
print(f" [{'✓' if c['ok'] else '✗'}] {c['expect']}")
|
|
if trace.get("error"):
|
|
print(f"ERROR: {trace['error']}")
|
|
|
|
return 0 if eval_["all_passed"] else 2
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|