Files
lore-engine-poc/plugins/trade.py
hermes 8e8503e8f9 T3: consistency plugin skeleton (4 violation tools, 4 Neo4j labels)
- plugins/consistency.py: find_contradictions, find_anachronisms, find_orphans,
  find_ontology_violations — all stub queries returning the
  {violations, count} envelope, ready for T5 to populate violation nodes.
  Two of the tools accept a severity filter (any|critical|major|minor).
- neo4j/init.cypher: uniqueness constraints on Contradiction, Anachronism,
  Orphan, OntologyViolation (id) + severity/status indexes on the
  contradiction/anachronism labels.
- README.md: bump plugin list to five, replace 'no consistency engine' copy
  with 'consistency engine is a stub', drop the two T3 bullet points from
  the next-steps section.

Verification: /healthz lists 18 tools (was 14), all 4 new tools return
{"violations": [], "count": 0}, full test.sh passes.
2026-06-16 14:21:48 +00:00

116 lines
3.8 KiB
Python

"""
trade plugin — Postgres-backed operational data.
Demonstrates the polyglot pattern: a domain type (trade log entry) that
isn't a core ontology concept, backed by Postgres because it's
high-volume time-series data, queryable through the same MCP gateway.
"""
from server import get_postgres, REGISTRY
def _q(sql, params=None, fetch=True):
conn = get_postgres()
try:
with conn.cursor() as cur:
cur.execute(sql, params or ())
if fetch and cur.description:
cols = [d[0] for d in cur.description]
return [dict(zip(cols, r)) for r in cur.fetchall()]
return []
finally:
conn.close()
@REGISTRY.tool(
name="log_trade",
description="Record a trade. Buyer and seller must exist as Person or Faction nodes (call entity_context to verify).",
input_schema={
"type": "object",
"properties": {
"buyer_id": {"type": "string"},
"seller_id": {"type": "string"},
"item_id": {"type": "string"},
"quantity": {"type": "number"},
"unit": {"type": "string", "default": "gp"},
"unit_price": {"type": "number"},
"location_id": {"type": "string"},
"in_fiction_time": {"type": "string"},
"notes": {"type": "string"},
},
"required": ["buyer_id", "seller_id", "item_id", "quantity", "unit_price"],
},
)
def log_trade(args):
total = float(args["quantity"]) * float(args["unit_price"])
_q("""
INSERT INTO trade_log
(buyer_id, seller_id, item_id, quantity, unit, unit_price, total_price, location_id, in_fiction_time, notes)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (
args["buyer_id"], args["seller_id"], args["item_id"],
args["quantity"], args.get("unit", "gp"), args["unit_price"], total,
args.get("location_id"), args.get("in_fiction_time"), args.get("notes"),
), fetch=False)
return {"logged": True, "total_price": total}
@REGISTRY.tool(
name="trades_by_buyer",
description="List trades where a given entity was the buyer, most recent first.",
input_schema={
"type": "object",
"properties": {
"buyer_id": {"type": "string"},
"limit": {"type": "integer", "default": 10},
},
"required": ["buyer_id"],
},
)
def trades_by_buyer(args):
rows = _q("""
SELECT id, occurred_at, seller_id, item_id, quantity, unit, unit_price, total_price, location_id, notes
FROM trade_log
WHERE buyer_id = %s
ORDER BY occurred_at DESC
LIMIT %s
""", (args["buyer_id"], args.get("limit", 10)))
return {"buyer": args["buyer_id"], "count": len(rows), "trades": rows}
@REGISTRY.tool(
name="market_price",
description="Average price for an item_id over the last N records. Computed from the trade log.",
input_schema={
"type": "object",
"properties": {
"item_id": {"type": "string"},
"limit": {"type": "integer", "default": 100, "minimum": 1, "maximum": 1000},
},
"required": ["item_id"],
},
)
def market_price(args):
rows = _q("""
SELECT unit_price, total_price, occurred_at
FROM trade_log
WHERE item_id = %s
ORDER BY occurred_at DESC
LIMIT %s
""", (args["item_id"], args.get("limit", 100)))
if not rows:
return {"item_id": args["item_id"], "sample_size": 0}
prices = [float(r["unit_price"]) for r in rows]
return {
"item_id": args["item_id"],
"sample_size": len(prices),
"avg_unit_price": round(sum(prices) / len(prices), 2),
"min_unit_price": min(prices),
"max_unit_price": max(prices),
"most_recent": rows[0]["occurred_at"].isoformat() if rows else None,
}
def register(registry):
pass