223 lines
8.2 KiB
Python
223 lines
8.2 KiB
Python
"""
|
|
Damascus Orchestrator E2E test configuration.
|
|
|
|
Tests run against the REAL Postgres state in the docker-compose stack.
|
|
Each test resets the work_items table before running, then exercises
|
|
one phase transition and asserts on the resulting row state.
|
|
|
|
Why real DB, not mocks:
|
|
- The contract is about the row state, the events_outbox, the cost_ledger
|
|
- Mocking Postgres would let us test our mocks, not the system
|
|
- The orchestrator's value is in the row-level atomicity, not the Python code
|
|
|
|
Test isolation: every test calls reset_state() in a fixture, which:
|
|
1. TRUNCATEs work_items, human_issues, cost_ledger, events_outbox
|
|
2. Inserts a single known story in a known phase
|
|
3. Returns the row id
|
|
|
|
TEST DATABASE ISOLATION (added 2026-06-26):
|
|
The pytest suite must NEVER TRUNCATE the production orchestrator DB at
|
|
127.0.0.1:5432. By default the suite connects to the separate
|
|
`db-test` compose service (port 5433 host / 5432 container, database
|
|
`damascus_test`, separate volume `dbtestdata`). The `clean_state`
|
|
autouse fixture runs `reset_state()` against this database only.
|
|
|
|
To run tests against the production DB (rare — only for diagnosing
|
|
issues that don't repro against db-test), set `DAMASCUS_ALLOW_TEST_RESET=1`.
|
|
The `prod-safety-guard` block in `reset_state()` will then allow it.
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
import psycopg
|
|
import pytest
|
|
from psycopg.rows import dict_row
|
|
|
|
|
|
DAMASCUS_ROOT = Path("/root/damascus-orchestrator")
|
|
WIKI_ROOT = DAMASCUS_ROOT / "wiki"
|
|
SPECS_DIR = DAMASCUS_ROOT / "specs" / "wh40k-pc"
|
|
|
|
# Production DB is identified by the FULL DSN — there's only one of it.
|
|
# If ANY field differs, this is not production. Whitelisting by full tuple
|
|
# is the only way to handle the fact that prod and test share the same
|
|
# port number (5432) in different network contexts (host-bound vs
|
|
# in-container). Tuple comparison is unforgeable; user/dbname checks
|
|
# catch the case where someone points at port 5432 with the wrong creds
|
|
# (which would be a misconfigured prod, not test).
|
|
_PROD_DSNS = frozenset({
|
|
# (host, port, user, dbname)
|
|
("127.0.0.1", 5432, "damascus", "damascus"), # host-loopback to prod
|
|
("localhost", 5432, "damascus", "damascus"), # same, via localhost
|
|
("db", 5432, "damascus", "damascus"), # in-container via compose
|
|
("damascus-orchestrator-db-1", 5432, "damascus", "damascus"), # by container name
|
|
})
|
|
|
|
# Real Postgres connection (matches docker-compose env)
|
|
# Default: connect to the `db-test` compose service on its dedicated
|
|
# port (5433 host / 5432 container). This is the TEST DB — its own
|
|
# volume, its own credentials, its own database. Production DB at
|
|
# 127.0.0.1:5432 is never touched.
|
|
#
|
|
# From the HOST (pytest on the dev machine): use 127.0.0.1:5433, which
|
|
# compose's `ports:` mapping exposes. The orchestrator container reaches
|
|
# the same DB at `db-test:5432` via the compose network.
|
|
#
|
|
# Override the test DSN via the DAMASCUS_TEST_PG_* env vars when needed.
|
|
DB_CONFIG = dict(
|
|
host=os.environ.get("DAMASCUS_TEST_PG_HOST") or os.environ.get("DAMASCUS_PG_HOST", "127.0.0.1"),
|
|
port=int(os.environ.get("DAMASCUS_TEST_PG_PORT") or os.environ.get("DAMASCUS_PG_PORT", "5433")),
|
|
user=os.environ.get("DAMASCUS_TEST_PG_USER") or os.environ.get("DAMASCUS_PG_USER", "damascus_test"),
|
|
password=os.environ.get("DAMASCUS_TEST_PG_PASSWORD") or os.environ.get("DAMASCUS_PG_PASSWORD", "damascus_test"),
|
|
dbname=os.environ.get("DAMASCUS_TEST_PG_DB") or os.environ.get("DAMASCUS_PG_DB", "damascus_test"),
|
|
autocommit=False,
|
|
)
|
|
|
|
|
|
def get_conn():
|
|
return psycopg.connect(**DB_CONFIG, row_factory=dict_row)
|
|
|
|
|
|
def run_cycle_in_container():
|
|
"""Run one orchestrator cycle in the container. Returns the cycle output."""
|
|
result = subprocess.run(
|
|
["docker", "compose", "-f", str(DAMASCUS_ROOT / "docker-compose.yml"),
|
|
"exec", "-T", "orchestrator", "damascus", "cycle"],
|
|
capture_output=True, text=True, timeout=600,
|
|
)
|
|
return result.stdout, result.stderr, result.returncode
|
|
|
|
|
|
def _prod_safety_guard():
|
|
"""Refuse to TRUNCATE the production DB unless explicitly opted in.
|
|
|
|
Identity check is a FULL (host, port, user, dbname) tuple. Any
|
|
difference — even one field — means it's not prod. This catches:
|
|
- host-loopback prod (127.0.0.1:5432/damascus/damascus)
|
|
- in-container prod (db:5432/damascus/damascus)
|
|
- misconfigured prod pointed-at with wrong creds (still prod, still bad)
|
|
- test DB in container (db-test:5432/damascus_test/damascus_test) → safe
|
|
- test DB from host (127.0.0.1:5433/damascus_test/damascus_test) → safe
|
|
|
|
DAMASCUS_ALLOW_TEST_RESET=1 permits the wipe with a loud warning.
|
|
"""
|
|
dsn = (DB_CONFIG["host"], DB_CONFIG["port"], DB_CONFIG["user"], DB_CONFIG["dbname"])
|
|
is_prod = dsn in _PROD_DSNS
|
|
|
|
if not is_prod:
|
|
return # Not prod (any other combination), proceed
|
|
|
|
if os.environ.get("DAMASCUS_ALLOW_TEST_RESET") == "1":
|
|
import warnings
|
|
warnings.warn(
|
|
f"reset_state() running against PRODUCTION DB at {dsn} "
|
|
f"because DAMASCUS_ALLOW_TEST_RESET=1. "
|
|
f"All work_items, human_issues, cost_ledger, events_outbox, "
|
|
f"and coordination_gates rows will be deleted.",
|
|
RuntimeWarning,
|
|
stacklevel=2,
|
|
)
|
|
return
|
|
|
|
# Default: skip rather than wipe production.
|
|
import warnings
|
|
warnings.warn(
|
|
f"reset_state() called against PRODUCTION DB at {dsn} — "
|
|
f"skipping TRUNCATE. Either (a) unset DAMASCUS_TEST_PG_* so the "
|
|
f"default db-test (127.0.0.1:5433/damascus_test/damascus_test) "
|
|
f"is used, or (b) set DAMASCUS_ALLOW_TEST_RESET=1 to confirm "
|
|
f"intent. pytest.skip()ing this fixture.",
|
|
RuntimeWarning,
|
|
stacklevel=2,
|
|
)
|
|
pytest.skip(
|
|
f"reset_state() refused to TRUNCATE production DB at {dsn}."
|
|
)
|
|
|
|
|
|
def reset_state():
|
|
"""Truncate all tables and restart sequences. Called by fixtures before each test.
|
|
|
|
Refuses to run against a known production DB unless
|
|
DAMASCUS_ALLOW_TEST_RESET=1 is set in the environment.
|
|
"""
|
|
_prod_safety_guard()
|
|
conn = get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"TRUNCATE work_items, human_issues, cost_ledger, events_outbox, "
|
|
"coordination_gates RESTART IDENTITY CASCADE"
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def insert_work_item(phase="spec", story_id=None, title="Test story",
|
|
file_scope=None, budget_cycles=5, project="wh40k-pc"):
|
|
"""Insert a single work_item. Returns the row id."""
|
|
conn = get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
row_id = str(uuid.uuid4())
|
|
cur.execute(
|
|
"""INSERT INTO work_items
|
|
(id, project, story_id, title, phase, file_scope, budget_cycles, priority)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, 100)""",
|
|
(row_id, project, story_id or f"test-{uuid.uuid4().hex[:8]}", title, phase,
|
|
file_scope or '["src/test.js"]', budget_cycles),
|
|
)
|
|
conn.commit()
|
|
return row_id
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_row(row_id):
|
|
"""Fetch the work_items row by id."""
|
|
conn = get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT * FROM work_items WHERE id = %s", (row_id,))
|
|
return cur.fetchone()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_events(row_id):
|
|
"""Fetch all events_outbox rows for a work_item, ordered by id."""
|
|
conn = get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT * FROM events_outbox WHERE work_item_id = %s ORDER BY id",
|
|
(row_id,),
|
|
)
|
|
return cur.fetchall()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_cost_rows(row_id):
|
|
conn = get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT * FROM cost_ledger WHERE work_item_id = %s ORDER BY id",
|
|
(row_id,),
|
|
)
|
|
return cur.fetchall()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def clean_state():
|
|
"""Every test starts with a clean test-DB state."""
|
|
reset_state()
|
|
yield
|
|
# Don't clean up after — leave state for inspection if the test fails |