Migrate to Postgres + Taskiq (conform to orchestration plan)
Some checks failed
test / contract-and-unit (pull_request) Failing after 8s

Bring the code to the plan: MySQL→Postgres 16 and cron→Taskiq (Python
BullMQ-equivalent over a Redis broker), with Postgres FOR UPDATE SKIP LOCKED
retained as the atomic claim. The per-tick "claim one item, run one phase"
model is unchanged.

Approved fixes folded in:
- claim_for_merge: delete the call to the non-existent state.claim_for_merge;
  claim order is now review→build→spec (merge happens inside review on pass).
- loop-breaker: a non-pass verdict with attempts>=budget_cycles parks the row
  as `blocked` + opens a human_issue + emits work.blocked (design §5/§16).
- spec_wrong: added to phases.VERDICTS and emitted by refine_spec when the
  spec is missing required sections (routes to spec, not awaiting_human).

Driver: PyMySQL→psycopg3 sync (dict_row cursor, Jsonb() for JSONB). schema.sql
rewritten to PG16 (enums, JSONB, TIMESTAMPTZ, BIGSERIAL, BEFORE UPDATE trigger
replacing MySQL ON UPDATE). cli init guard-creates the DB and applies the whole
schema in one execute().

New src/damascus/tasks.py wires ListQueueBroker + TaskiqScheduler with a
run_cycle task (→ cycle.tick()) on a cron label. Dockerfile CMD runs the
worker; docker-compose adds redis:7 + an orchestrator-scheduler service.

Bugs found and fixed during verification:
- cycle.py/cli.py status file was hardcoded to /data; now uses settings.data_dir.
- redis-py 8.0.0 defaults socket_timeout=5s, which killed idle Taskiq workers
  (indefinite BRPOP + uncaught TimeoutError). Broker now sets socket_timeout=None.
- docker-compose scheduler command pointed at :broker; fixed to :scheduler.
- tasks.py docstring referenced non-existent --concurrency; corrected to
  --max-threadpool-threads.

Verified: schema idempotent against postgres:16; damascus init end-to-end;
19 contract+unit tests green; Taskiq worker kiq path advances a row; Taskiq
scheduler path (no damascus cycle call) drives spec→build→retry→blocked +
human_issue, proving the queue replaces cron and the loop-breaker via the queue.

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2026-06-23 13:24:58 -04:00
parent a1ccb467e5
commit e21a8c1f53
17 changed files with 523 additions and 199 deletions

View File

@@ -9,6 +9,31 @@ on:
jobs:
contract-and-unit:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16
env:
POSTGRES_USER: damascus
POSTGRES_PASSWORD: damascus
POSTGRES_DB: damascus
ports:
- "5432:5432"
options: >-
--health-cmd "pg_isready -U damascus -d damascus"
--health-interval 5s
--health-timeout 5s
--health-retries 20
env:
DAMASCUS_ROOT: ${{ github.workspace }}
DAMASCUS_SCHEMA_PATH: ${{ github.workspace }}/schema.sql
DAMASCUS_PG_HOST: 127.0.0.1
DAMASCUS_PG_PORT: "5432"
DAMASCUS_PG_USER: damascus
DAMASCUS_PG_PASSWORD: damascus
DAMASCUS_PG_DB: damascus
steps:
- uses: actions/checkout@v4
@@ -20,11 +45,16 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest pymysql
pip install pytest psycopg[binary] pydantic pydantic-settings click rich pyyaml httpx
pip install -e .
- name: Run contract + unit tests (no docker stack needed)
- name: Apply schema
run: |
damascus init
- name: Run contract + unit tests
run: |
pytest tests/contract/ tests/unit/ -v
# E2E tests are docker-stack-dependent and are run separately.
# See scripts/verify.sh for the full local E2E run.
# See bin/run-cycle.sh + docker compose for the full local E2E run.

View File

@@ -46,4 +46,7 @@ ENV PYTHONUNBUFFERED=1 \
DAMASCUS_WORKSPACE_DIR=/workspace
EXPOSE 9100
CMD ["python", "-m", "damascus.cycle"]
# Taskiq worker is the automatic trigger (design doc §13). `--concurrency N`
# is the global concurrency cap (§10); set via compose. The scheduler runs
# as a separate compose service. `damascus cycle` is the manual one-shot.
CMD ["taskiq", "worker", "damascus.tasks:broker"]

View File

@@ -5,27 +5,35 @@ A self-hosted work-item state machine that autonomously advances stories through
## Quick start
```bash
# Apply schema
docker compose exec db mysql -udamascus -pdamascus damascus < schema.sql
# Start the stack (Postgres + Redis + Taskiq worker + scheduler)
docker compose up -d --build
# Start the stack
docker compose up -d
# Apply schema (creates the DB + all tables/types/triggers)
docker compose exec orchestrator damascus init
# One cycle
# Manual one-shot cycle (operators / E2E). The Taskiq worker is the
# automatic trigger — you do not normally need to run this by hand.
docker compose exec orchestrator damascus cycle
# Run the E2E suite
pip install pytest pymysql
# External concurrency view
docker compose exec orchestrator damascus status
# Run the contract + unit suite (needs a Postgres on 127.0.0.1:5432)
pip install -e . pytest psycopg[binary]
pytest tests/contract/ tests/unit/ -v
# Run the E2E suite (needs the docker-compose stack up)
pip install pytest psycopg
pytest tests/ -v
```
## What this repo contains
- `src/damascus/` — the Python package (cycle, phases, state, git_ops, llm, cli, relay, wiki, config)
- `tests/` — the E2E suite (61 tests; contract + unit + E2E; the executable form of the design doc)
- `schema.sql`MySQL schema (work_items, coordination_gates, human_issues, cost_ledger, events_outbox)
- `docker-compose.yml` — the stack (db + orchestrator + sidecar-status)
- `Dockerfile` — the orchestrator image (Python 3.12 + git + mysql-client + claude-code + BMAD + LLM-wiki + ollama binary)
- `src/damascus/` — the Python package (cycle, phases, state, git_ops, llm, cli, relay, wiki, tasks, config)
- `tests/` — the suite (contract + unit + E2E; the executable form of the design doc)
- `schema.sql`Postgres 16 schema (work_items, coordination_gates, human_issues, cost_ledger, events_outbox)
- `docker-compose.yml` — the stack (db + redis + orchestrator worker + scheduler + sidecar-status)
- `Dockerfile` — the orchestrator image (Python 3.12 + git + claude-code + BMAD + LLM-wiki + ollama binary)
- `.gitea/workflows/` — CI
- `skills/SKILL.md` — operator-facing skill
@@ -37,7 +45,18 @@ pytest tests/ -v
## Architecture (one paragraph)
MySQL is the source of truth on work-item state. Each story row flows through three loops: spec-refiner (LLM via LiteLLM writes an implementable spec), code-builder (Claude Code via LiteLLM writes the code in a git worktree, opens a real Gitea PR), reviewer (re-runs the spec's test command, gates on objective pass/fail, merges via Gitea API on pass). Atomic claim uses `SELECT ... FOR UPDATE SKIP LOCKED`. Every phase transition emits a typed verdict and an `events_outbox` row. An attempt budget guarantees termination. The human is async — open questions become `human_issues` rows, never synchronous blocks.
Postgres is the source of truth on work-item state (design §3). Each story row
flows through three loops: spec-refiner (LLM via LiteLLM writes an implementable
spec), code-builder (Claude Code via LiteLLM writes the code in a git worktree,
opens a real Gitea PR), reviewer (re-runs the spec's test command, gates on
objective pass/fail, merges via Gitea API on pass). Atomic claim uses
`SELECT ... FOR UPDATE SKIP LOCKED`. Taskiq (a BullMQ-equivalent Python queue,
§13) with a Redis broker is the recurring trigger; the worker's `--concurrency N`
is the global concurrency cap (§10). Every phase transition emits a typed
verdict and an `events_outbox` row in the same transaction. An attempt budget
guarantees termination — a non-pass verdict that exhausts the budget parks the
item as `blocked` and opens a `human_issue` (§5/§16). The human is async — open
questions become `human_issues` rows, never synchronous blocks.
Full design + contracts in the wiki: `kaykayyali/damascus-wiki`.

View File

@@ -1,6 +1,8 @@
#!/usr/bin/env bash
# Wrapper that runs the cycle. Designed to be invoked from a host cron:
# * * * * * /root/damascus-orchestrator/bin/run-cycle.sh >> /data/logs/cron.log 2>&1
# Manual one-shot tick. The AUTOMATIC trigger is the Taskiq worker + scheduler
# (see docker-compose.yml), so you do NOT normally need cron. Keep this for
# operator manual ticks and the E2E suite:
# docker compose exec -T orchestrator damascus cycle
set -euo pipefail
cd "$(dirname "$0")/.."
docker compose exec -T orchestrator damascus cycle

View File

@@ -1,19 +1,27 @@
services:
db:
image: mysql:8.4
image: postgres:16
restart: unless-stopped
environment:
MYSQL_ROOT_PASSWORD: rootpw
MYSQL_DATABASE: damascus
MYSQL_USER: damascus
MYSQL_PASSWORD: damascus
POSTGRES_USER: damascus
POSTGRES_PASSWORD: damascus
POSTGRES_DB: damascus
volumes:
- dbdata:/var/lib/mysql
- dbdata:/var/lib/postgresql/data
# Expose DB only on loopback for the E2E test suite (host runs pytest)
ports:
- "127.0.0.1:3307:3306"
- "127.0.0.1:5432:5432"
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-u", "damascus", "-pdamascus"]
test: ["CMD", "pg_isready", "-U", "damascus", "-d", "damascus"]
interval: 5s
timeout: 5s
retries: 20
redis:
image: redis:7
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 20
@@ -25,14 +33,19 @@ services:
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
extra_hosts:
- "host.docker.internal:host-gateway" # reach host's LiteLLM / Gitea
environment:
DAMASCUS_MYSQL_HOST: db
DAMASCUS_MYSQL_PORT: "3306"
DAMASCUS_MYSQL_USER: damascus
DAMASCUS_MYSQL_PASSWORD: damascus
DAMASCUS_MYSQL_DB: damascus
DAMASCUS_PG_HOST: db
DAMASCUS_PG_PORT: "5432"
DAMASCUS_PG_USER: damascus
DAMASCUS_PG_PASSWORD: damascus
DAMASCUS_PG_DB: damascus
# Taskiq broker transport (BullMQ-equivalent)
DAMASCUS_REDIS_URL: redis://redis:6379
# LLM proxy on the host (default port 4000)
DAMASCUS_LLM_BASE_URL: http://host.docker.internal:4000
@@ -65,8 +78,26 @@ services:
- /home/kaykayyali/_bmad:/opt/damascus/bmad/_kit:ro
# E2E test suite (read-only; tests run from the host)
- ./tests:/opt/damascus/tests:ro
command: ["sleep", "infinity"]
# To run on a cron: change the command to a loop. See bin/run-cycle.sh.
# Taskiq worker — the global concurrency cap (design doc §10). For sync
# tasks (run_cycle), --max-threadpool-threads is the parallelism knob.
command: ["taskiq", "worker", "damascus.tasks:broker", "--max-threadpool-threads", "1"]
orchestrator-scheduler:
image: damascus-orchestrator:latest
restart: unless-stopped
depends_on:
redis:
condition: service_healthy
environment:
DAMASCUS_REDIS_URL: redis://redis:6379
DAMASCUS_PG_HOST: db
DAMASCUS_PG_PORT: "5432"
DAMASCUS_PG_USER: damascus
DAMASCUS_PG_PASSWORD: damascus
DAMASCUS_PG_DB: damascus
# Exactly one scheduler enqueues the recurring run_cycle task on the cron.
# The path must point at the TaskiqScheduler instance, not the broker.
command: ["taskiq", "scheduler", "damascus.tasks:scheduler"]
sidecar-status:
image: python:3.12-slim
@@ -85,4 +116,4 @@ volumes:
dbdata:
orchdata:
worktrees:
projects:
projects:

View File

@@ -1,10 +1,13 @@
[project]
name = "damascus-orchestrator"
version = "0.1.0"
description = "Postgres-style atomic-claim orchestrator with MySQL, Gitea, and a file-based LLM wiki."
description = "Postgres + Taskiq atomic-claim orchestrator with Gitea and a file-based LLM wiki."
requires-python = ">=3.11"
dependencies = [
"PyMySQL>=1.1.0",
"psycopg[binary]>=3.2",
"taskiq>=0.11,<0.13",
"taskiq-redis>=0.4",
"redis>=5",
"httpx>=0.27",
"PyYAML>=6.0",
"pydantic>=2.6",
@@ -22,3 +25,8 @@ build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
markers = [
"db: tests that require a live Postgres (skip with -m 'not db' for a fast loop)",
]

View File

@@ -1,10 +1,52 @@
-- Damascus orchestrator — MySQL 8 schema.
-- Damascus orchestrator — PostgreSQL 16 schema.
-- Atomic claim is implemented with `SELECT ... FOR UPDATE SKIP LOCKED`
-- on `work_items`, gated by a per-row `phase` enum.
--
-- Postgres is the authoritative scheduler (design doc §3). This file is
-- applied by `damascus init`, which creates the `damascus` database first
-- (CREATE DATABASE cannot run inside a transaction/DO block) and then
-- executes this whole file in a single execute() call.
CREATE DATABASE IF NOT EXISTS damascus
CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE damascus;
-- --- enum types (idempotent; CREATE TYPE has no IF NOT EXISTS) ------------
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'work_item_phase') THEN
CREATE TYPE work_item_phase AS ENUM
('spec','build','review','merged','blocked','awaiting_human');
END IF;
END $$;
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'gate_kind') THEN
CREATE TYPE gate_kind AS ENUM ('and','or','first');
END IF;
END $$;
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'issue_status') THEN
CREATE TYPE issue_status AS ENUM ('open','answered','resolved');
END IF;
END $$;
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'verdict_kind') THEN
-- design doc §5: pass | tests_failed | rebase_conflict |
-- spec_ambiguous | spec_wrong | no_pr
CREATE TYPE verdict_kind AS ENUM
('pass','tests_failed','rebase_conflict','spec_ambiguous','spec_wrong','no_pr');
END IF;
END $$;
-- --- shared trigger function: keep updated_at honest (no ON UPDATE in PG) --
CREATE OR REPLACE FUNCTION fn_touch_updated_at() RETURNS trigger AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- --- tables ---------------------------------------------------------------
-- work_items: one row per story, across all projects.
CREATE TABLE IF NOT EXISTS work_items (
@@ -12,37 +54,42 @@ CREATE TABLE IF NOT EXISTS work_items (
project VARCHAR(64) NOT NULL,
story_id VARCHAR(128) NOT NULL,
title VARCHAR(255) NOT NULL DEFAULT '',
phase ENUM('spec','build','review','merged','blocked','awaiting_human')
NOT NULL DEFAULT 'spec',
file_scope JSON NOT NULL,
phase work_item_phase NOT NULL DEFAULT 'spec',
file_scope JSONB NOT NULL DEFAULT '[]'::jsonb,
attempts INT NOT NULL DEFAULT 0,
budget_cycles INT NOT NULL DEFAULT 5,
priority INT NOT NULL DEFAULT 100,
base_commit VARCHAR(64) DEFAULT NULL,
branch VARCHAR(128) DEFAULT NULL,
pr_url VARCHAR(512) DEFAULT NULL,
last_verdict ENUM('pass','tests_failed','rebase_conflict','spec_ambiguous','spec_wrong','no_pr')
DEFAULT NULL,
last_feedback JSON DEFAULT NULL,
last_verdict verdict_kind DEFAULT NULL,
last_feedback JSONB DEFAULT NULL,
spec_path VARCHAR(512) DEFAULT NULL,
wiki_pin VARCHAR(64) DEFAULT NULL,
claimed_by VARCHAR(64) DEFAULT NULL,
claimed_at TIMESTAMP NULL DEFAULT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
merged_at TIMESTAMP NULL DEFAULT NULL,
UNIQUE KEY uniq_project_story (project, story_id),
KEY idx_phase_priority (phase, priority, updated_at)
) ENGINE=InnoDB;
claimed_at TIMESTAMPTZ DEFAULT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
merged_at TIMESTAMPTZ DEFAULT NULL,
UNIQUE (project, story_id)
);
DROP TRIGGER IF EXISTS trg_work_items_touch ON work_items;
CREATE TRIGGER trg_work_items_touch BEFORE UPDATE ON work_items
FOR EACH ROW EXECUTE FUNCTION fn_touch_updated_at();
CREATE INDEX IF NOT EXISTS idx_work_items_phase_priority
ON work_items (phase, priority, updated_at);
-- coordination_gates: AND/OR/FIRST dependency edges between stories.
CREATE TABLE IF NOT EXISTS coordination_gates (
parent_id CHAR(36) NOT NULL,
child_id CHAR(36) NOT NULL,
kind ENUM('and','or','first') NOT NULL DEFAULT 'and',
PRIMARY KEY (parent_id, child_id),
KEY idx_child (child_id)
) ENGINE=InnoDB;
kind gate_kind NOT NULL DEFAULT 'and',
PRIMARY KEY (parent_id, child_id)
);
CREATE INDEX IF NOT EXISTS idx_coord_gates_child ON coordination_gates (child_id);
-- human_issues: async channel for open questions / approvals.
CREATE TABLE IF NOT EXISTS human_issues (
@@ -50,16 +97,17 @@ CREATE TABLE IF NOT EXISTS human_issues (
work_item_id CHAR(36) NOT NULL,
question TEXT NOT NULL,
answer TEXT DEFAULT NULL,
status ENUM('open','answered','resolved') NOT NULL DEFAULT 'open',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
answered_at TIMESTAMP NULL DEFAULT NULL,
KEY idx_status (status, created_at),
KEY idx_item (work_item_id)
) ENGINE=InnoDB;
status issue_status NOT NULL DEFAULT 'open',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
answered_at TIMESTAMPTZ DEFAULT NULL
);
CREATE INDEX IF NOT EXISTS idx_human_issues_status ON human_issues (status, created_at);
CREATE INDEX IF NOT EXISTS idx_human_issues_item ON human_issues (work_item_id);
-- cost_ledger: per-call token + dollar spend.
CREATE TABLE IF NOT EXISTS cost_ledger (
id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
id BIGSERIAL PRIMARY KEY,
work_item_id CHAR(36) DEFAULT NULL,
project VARCHAR(64) DEFAULT NULL,
phase VARCHAR(32) DEFAULT NULL,
@@ -67,25 +115,29 @@ CREATE TABLE IF NOT EXISTS cost_ledger (
input_tokens INT DEFAULT NULL,
output_tokens INT DEFAULT NULL,
usd DECIMAL(10,6) DEFAULT NULL,
recorded_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
KEY idx_item (work_item_id),
KEY idx_recorded (recorded_at)
) ENGINE=InnoDB;
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- events_outbox: per-cycle transition log (replaces a "drainer" daemon;
-- the cycle function writes a row per transition in the same txn).
CREATE INDEX IF NOT EXISTS idx_cost_ledger_item ON cost_ledger (work_item_id);
CREATE INDEX IF NOT EXISTS idx_cost_ledger_recorded ON cost_ledger (recorded_at);
-- events_outbox: per-cycle transition log. Written in the same transaction
-- as the state update (transactional outbox, design doc §8). A drainer
-- delivers these to the overseer with idempotency keys (Phase 3).
CREATE TABLE IF NOT EXISTS events_outbox (
id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
id BIGSERIAL PRIMARY KEY,
work_item_id CHAR(36) DEFAULT NULL,
kind VARCHAR(64) NOT NULL,
payload JSON NOT NULL,
delivered TINYINT(1) NOT NULL DEFAULT 0,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
KEY idx_delivered (delivered, created_at)
) ENGINE=InnoDB;
payload JSONB NOT NULL,
delivered BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_events_outbox_delivered
ON events_outbox (delivered, created_at);
-- A handy view for the external concurrency tracker.
CREATE OR REPLACE VIEW v_active_claims AS
SELECT id, project, story_id, phase, claimed_by, claimed_at, updated_at
FROM work_items
WHERE phase IN ('build','review','spec');
WHERE phase IN ('spec','build','review');

View File

@@ -83,11 +83,21 @@ docker compose exec orchestrator damascus status # external view
3. Ingest stories: `damascus ingest --project <project>`.
4. The next cycle picks the first ready story and runs `spec → build → review → merged`.
## Local cron
## Recurring trigger (Taskiq)
```cron
* * * * * /root/damascus-orchestrator/bin/run-cycle.sh >> /data/logs/cron.log 2>&1
The cycle is driven by Taskiq (the Python BullMQ-equivalent) over a Redis
broker, not host cron. `src/damascus/tasks.py` wires a `ListQueueBroker` plus a
`TaskiqScheduler` that enqueues `run_cycle` (→ `cycle.tick()`) on a cron label.
Run one worker and one scheduler:
```bash
taskiq worker damascus.tasks:broker --max-threadpool-threads $N # consumes + runs
taskiq scheduler damascus.tasks:scheduler # enqueues on the cron
```
Concurrency = number of cron entries. Each entry uses a different
`DAMASCUS_CONCURRENCY_ID` (set in the environment you launch `docker compose` with).
`damascus cycle` (and `bin/run-cycle.sh`) remain as the deterministic one-shot
operator path — they call `cycle.tick()` directly, bypassing the queue.
Concurrency = the worker's `--max-threadpool-threads N` (where
`N = DAMASCUS_MAX_CONCURRENT`). `FOR UPDATE SKIP LOCKED` makes N parallel ticks
safe. Each host runs its own worker with a distinct `DAMASCUS_CONCURRENCY_ID`.

View File

@@ -10,6 +10,7 @@ from pathlib import Path
import click
from rich.console import Console
from rich.table import Table
from psycopg.types.json import Jsonb
from . import state
from .config import settings
@@ -159,7 +160,7 @@ def start_cmd(item_id):
console.print(f"[yellow]warning: item is in phase {item['phase']}, not 'spec'.[/yellow]")
cur.execute("UPDATE work_items SET phase='build', updated_at=NOW() WHERE id=%s", (item_id,))
cur.execute("INSERT INTO events_outbox (work_item_id, kind, payload) VALUES (%s,'manual.start',%s)",
(item_id, "{}"))
(item_id, Jsonb({})))
console.print(f"[green]started[/green] {item['project']}/{item['story_id']}")
@@ -177,7 +178,7 @@ def complete_cmd(item_id, summary):
cur.execute("UPDATE work_items SET phase='merged', updated_at=NOW() WHERE id=%s", (item_id,))
cur.execute(
"INSERT INTO events_outbox (work_item_id, kind, payload) VALUES (%s,'manual.complete',%s)",
(item_id, json.dumps({"summary": summary})),
(item_id, Jsonb({"summary": summary})),
)
console.print(f"[green]completed[/green] {item['project']}/{item['story_id']}")
@@ -190,7 +191,7 @@ def block_cmd(item_id, reason):
cur.execute("UPDATE work_items SET phase='blocked', updated_at=NOW() WHERE id=%s", (item_id,))
cur.execute(
"INSERT INTO events_outbox (work_item_id, kind, payload) VALUES (%s,'manual.block',%s)",
(item_id, json.dumps({"reason": reason})),
(item_id, Jsonb({"reason": reason})),
)
console.print(f"[yellow]blocked[/yellow] {item_id}")
@@ -219,7 +220,7 @@ def answer_cmd(issue_id, answer_text):
WHERE id=%s AND phase='awaiting_human'""", (wid,))
cur.execute("""INSERT INTO events_outbox (work_item_id, kind, payload)
VALUES (%s,'human.answer',%s)""",
(wid, json.dumps({"issue_id": issue_id, "answer": answer_text})))
(wid, Jsonb({"issue_id": issue_id, "answer": answer_text})))
console.print(f"[green]answered[/green] {issue_id}; work item {wid[:8]} returned to spec.")
@@ -262,7 +263,7 @@ def ingest_cmd(project, dry_run):
@cli.command("status")
def status_cmd():
"""Show what's currently claimed, the active concurrency file, and counts by phase."""
p = Path("/data/status/active.json")
p = settings.data_dir / "status" / "active.json"
if p.exists():
data = json.loads(p.read_text())
console.print(f"concurrency_id: {data['concurrency_id']}")
@@ -297,23 +298,39 @@ def cycle_cmd():
@cli.command("init")
@click.pass_context
def init_cmd(ctx):
"""Run schema.sql against the configured MySQL."""
schema = Path("/opt/damascus/schema.sql")
"""Run schema.sql against the configured Postgres.
CREATE DATABASE cannot run inside a transaction or a DO block, so we
connect to the maintenance DB (`postgres`), guard-create the target DB,
reconnect to it, and execute the whole schema file in a single call —
psycopg3 parses multi-statement SQL including `DO $$ ... $$` blocks."""
import psycopg
schema = settings.schema_path
if not schema.exists():
console.print(f"[red]schema.sql not found at {schema}[/red]")
sys.exit(1)
sql = schema.read_text()
# Connect WITHOUT a database (we create it first)
import pymysql
conn = pymysql.connect(host=settings.mysql_host, port=settings.mysql_port,
user=settings.mysql_user, password=settings.mysql_password)
# 1. Connect to the maintenance DB and guard-create the target database.
admin = psycopg.connect(
host=settings.pg_host, port=settings.pg_port,
user=settings.pg_user, password=settings.pg_password,
dbname="postgres", autocommit=True,
)
try:
with admin.cursor() as cur:
cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (settings.pg_db,))
if not cur.fetchone():
cur.execute(f'CREATE DATABASE "{settings.pg_db}"')
console.print(f"[green]created database {settings.pg_db}[/green]")
finally:
admin.close()
# 2. Connect to the target DB and apply the schema in one execute().
conn = psycopg.connect(**settings.psycopg_kwargs)
try:
with conn.cursor() as cur:
for stmt in sql.split(";"):
s = stmt.strip()
if not s:
continue
cur.execute(s)
cur.execute(sql)
conn.commit()
finally:
conn.close()

View File

@@ -9,12 +9,15 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="DAMASCUS_", env_file=None, extra="ignore")
# MySQL
mysql_host: str = "db"
mysql_port: int = 3306
mysql_user: str = "damascus"
mysql_password: str = "damascus"
mysql_db: str = "damascus"
# Postgres — the authoritative scheduler (design doc §3)
pg_host: str = "db"
pg_port: int = 5432
pg_user: str = "damascus"
pg_password: str = "damascus"
pg_db: str = "damascus"
# Redis — the Taskiq broker transport (BullMQ-equivalent, design doc §13)
redis_url: str = "redis://redis:6379"
# LLM (LiteLLM proxy)
llm_base_url: str = "http://host.docker.internal:4000"
@@ -28,6 +31,7 @@ class Settings(BaseSettings):
# Filesystem
data_dir: Path = Path("/data")
workspace_dir: Path = Path("/workspace")
schema_path: Path = Path("/opt/damascus/schema.sql")
bmad_dir: Path = Path("/opt/damascus/bmad")
wiki_dir: Path = Path("/opt/damascus/llm-wiki")
claude_bin: str = "/usr/local/bin/claude"
@@ -61,19 +65,20 @@ class Settings(BaseSettings):
@property
def dsn(self) -> str:
return (
f"mysql+pymysql://{self.mysql_user}:{self.mysql_password}"
f"@{self.mysql_host}:{self.mysql_port}/{self.mysql_db}?charset=utf8mb4"
f"postgresql://{self.pg_user}:{self.pg_password}"
f"@{self.pg_host}:{self.pg_port}/{self.pg_db}"
)
@property
def pymysql_kwargs(self) -> dict:
def psycopg_kwargs(self) -> dict:
"""Connection args for psycopg3 sync connect(). row_factory is set
per-cursor (in state.transaction), not on the connection."""
return dict(
host=self.mysql_host,
port=self.mysql_port,
user=self.mysql_user,
password=self.mysql_password,
database=self.mysql_db,
charset="utf8mb4",
host=self.pg_host,
port=self.pg_port,
user=self.pg_user,
password=self.pg_password,
dbname=self.pg_db,
autocommit=False,
)

View File

@@ -5,7 +5,6 @@ import json
import logging
import time
from datetime import datetime, timezone
from pathlib import Path
from . import phases, relay, state, wiki
from .config import settings
@@ -22,7 +21,7 @@ def _log_line(payload: dict) -> None:
def _write_status_file(active: list[dict]) -> None:
"""External concurrency tracker: a JSON file the user can `cat`."""
p = Path("/data/status/active.json")
p = settings.data_dir / "status" / "active.json"
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(json.dumps({
"concurrency_id": settings.concurrency_id,
@@ -57,14 +56,15 @@ def tick() -> dict:
active = _active_claims(cur)
_write_status_file(active)
# 1. Pick the next work item. Order matters:
# - merge first (in-flight reviews that already passed the test re-run)
# - review (rows that have a pr_url and need a re-test)
# 1. Pick the next work item. Order matters — drain what's closest
# to done first:
# - review (rows that have a pr_url and need a re-test + merge)
# - build (rows with a spec, awaiting the actual code work)
# - spec (everything else, needs a spec written)
# There is no separate `merge` phase: review transitions to
# `merged` on a pass verdict (see _next_phase_on_verdict).
item = (
state.claim_for_merge(cur)
or state.claim_for_review(cur)
state.claim_for_review(cur)
or state.claim_for_build(cur)
or state.claim_for_spec(cur)
)
@@ -87,7 +87,13 @@ def tick() -> dict:
except Exception as e: # noqa: BLE001
log.exception("phase error")
result = {"verdict": "tests_failed", "feedback": {"error": str(e)[:500]}}
target_phase = item["phase"] # stay put, budget will trip
# Stay put and let the budget loop-breaker park it — but if this
# crash was the attempt that exhausted the budget, block now so
# the row isn't silently skipped by the claim forever.
if item["attempts"] >= item["budget_cycles"]:
target_phase = "blocked"
else:
target_phase = item["phase"]
# 3. Apply the verdict. Forward pr_url/branch/base_commit into the
# row so the review phase can verify the build actually produced
@@ -109,6 +115,25 @@ def tick() -> dict:
"from": item["phase"], "to": target_phase,
"verdict": result["verdict"], "feedback": verdict_feedback,
})
# 3b. Loop-breaker: when a non-pass verdict exhausts the attempt
# budget, the item is parked as `blocked` and surfaced to the
# human via a human_issue (design doc §5 / §16). pass is exempt
# (attempts are not consumed on success).
if target_phase == "blocked":
issue_id = state.open_human_issue(
cur, item["id"],
f"[{item['project']}/{item['story_id']}] blocked after "
f"{item['attempts']}/{item['budget_cycles']} attempts "
f"({result['verdict']}): {verdict_feedback}",
)
state.emit_event(cur, item["id"], "work.blocked", {
"verdict": result["verdict"],
"attempts": item["attempts"],
"budget_cycles": item["budget_cycles"],
"issue_id": issue_id,
"feedback": verdict_feedback,
})
summary["transition"] = {
"from": item["phase"], "to": target_phase,
"verdict": result["verdict"],
@@ -138,10 +163,18 @@ def _next_phase_on_verdict(item: dict, result: dict) -> str:
if item["phase"] == "build":
return "review"
return "build" # spec → build on a clean refine
if v in ("tests_failed", "rebase_conflict"):
# Loop-breaker (design doc §5/§16): a non-pass verdict with the attempt
# budget exhausted parks the item as `blocked`. `attempts` was already
# incremented by the claim, so this is a direct comparison (no off-by-one).
if item["attempts"] >= item["budget_cycles"]:
return "blocked"
if v in ("tests_failed", "rebase_conflict", "no_pr"):
return "build" # retry build
if v == "spec_ambiguous":
return "awaiting_human"
if v in ("spec_ambiguous", "spec_wrong"):
# spec_ambiguous opens a human_issue in refine_spec and the row is
# already set to awaiting_human by the time we get here; spec_wrong
# (internally broken spec) retries the refiner.
return "awaiting_human" if v == "spec_ambiguous" else "spec"
return item["phase"]

View File

@@ -52,7 +52,22 @@ def refine_spec(cur, item: dict) -> dict:
state.record_cost(cur, item["id"], project, "spec", result["model"],
result["input_tokens"], result["output_tokens"], result["usd"])
if "## Ambiguities" in result["text"] and re.search(r"\?\s*$", _section(result["text"], "Ambiguities")):
# Internal consistency check: a spec that is missing any of the sections
# the builder/reviewer depend on is internally broken (not merely
# ambiguous). Route back to the refiner with no human needed (design §5).
text = result["text"] or ""
missing = [s for s in ("Goal", "Acceptance Criteria", "TDD Plan", "Test Command")
if not _section(text, s)]
if missing:
return _verdict("spec_wrong", {
"reason": "spec missing required sections",
"missing": missing, "preview": text[:500],
"model": result["model"],
"input_tokens": result["input_tokens"],
"output_tokens": result["output_tokens"], "usd": result["usd"],
})
if "## Ambiguities" in text and re.search(r"\?\s*$", _section(text, "Ambiguities")):
issue_id = state.open_human_issue(
cur, item["id"], f"[{project}/{story_id}] {title}: {_section(result['text'], 'Ambiguities')}"
)
@@ -330,7 +345,7 @@ def _merge_pr_via_gitea(project: str, pr_url: str) -> dict:
# --- helpers --------------------------------------------------------------
VERDICTS = {"pass", "tests_failed", "rebase_conflict", "spec_ambiguous", "no_pr"}
VERDICTS = {"pass", "tests_failed", "rebase_conflict", "spec_ambiguous", "spec_wrong", "no_pr"}
def _verdict(v: str, feedback: dict) -> dict:

View File

@@ -2,26 +2,26 @@
from __future__ import annotations
import contextlib
import json
import uuid
from typing import Iterator
import pymysql
from pymysql.cursors import DictCursor
import psycopg
from psycopg.rows import dict_row
from psycopg.types.json import Jsonb
from .config import settings
def connect() -> pymysql.connections.Connection:
return pymysql.connect(**settings.pymysql_kwargs)
def connect() -> psycopg.Connection:
return psycopg.connect(**settings.psycopg_kwargs)
@contextlib.contextmanager
def transaction() -> Iterator[pymysql.cursors.Cursor]:
"""One transaction, one cursor, autocommit at the end."""
def transaction() -> Iterator[psycopg.Cursor]:
"""One transaction, one dict cursor, autocommit at the end."""
conn = connect()
try:
with conn.cursor(DictCursor) as cur:
with conn.cursor(row_factory=dict_row) as cur:
yield cur
conn.commit()
except Exception:
@@ -59,12 +59,11 @@ def _claim_with_filter(cur, from_phase: str, to_phase: str, where_extra: str = "
f"""UPDATE work_items
SET phase = %s, attempts = attempts + 1,
claimed_by = %s, claimed_at = NOW(), updated_at = NOW()
WHERE id = %s""",
WHERE id = %s
RETURNING *""",
(to_phase, settings.concurrency_id, row["id"]),
)
cur.execute("SELECT * FROM work_items WHERE id = %s", (row["id"],))
claimed = cur.fetchone()
return claimed
return cur.fetchone()
def claim_for_spec(cur) -> dict | None:
@@ -86,10 +85,10 @@ def claim_for_spec(cur) -> dict | None:
"""UPDATE work_items
SET attempts = attempts + 1, claimed_by = %s, claimed_at = NOW(),
updated_at = NOW()
WHERE id = %s""",
WHERE id = %s
RETURNING *""",
(settings.concurrency_id, row["id"]),
)
cur.execute("SELECT * FROM work_items WHERE id = %s", (row["id"],))
return cur.fetchone()
@@ -112,10 +111,10 @@ def claim_for_build(cur) -> dict | None:
"""UPDATE work_items
SET attempts = attempts + 1,
claimed_by = %s, claimed_at = NOW(), updated_at = NOW()
WHERE id = %s""",
WHERE id = %s
RETURNING *""",
(settings.concurrency_id, row["id"]),
)
cur.execute("SELECT * FROM work_items WHERE id = %s", (row["id"],))
return cur.fetchone()
@@ -131,7 +130,7 @@ def claim_for_review(cur) -> dict | None:
# --- writes ---------------------------------------------------------------
def upsert_story(cur, project: str, story_id: str, title: str, file_scope: list[str]) -> str:
def upsert_story(cur, project: str, story_id: str, title: str, file_scope: list) -> str:
"""Create or update a story row. Returns its id."""
cur.execute("SELECT id FROM work_items WHERE project=%s AND story_id=%s", (project, story_id))
existing = cur.fetchone()
@@ -141,7 +140,7 @@ def upsert_story(cur, project: str, story_id: str, title: str, file_scope: list[
cur.execute(
"""INSERT INTO work_items (id, project, story_id, title, file_scope)
VALUES (%s, %s, %s, %s, %s)""",
(new_id, project, story_id, title, json.dumps(file_scope)),
(new_id, project, story_id, title, Jsonb(file_scope)),
)
return new_id
@@ -151,8 +150,10 @@ def set_phase(cur, work_id: str, phase: str, **fields) -> None:
sets = ["phase = %s", "updated_at = NOW()", "claimed_by = NULL"]
params: list = [phase]
for k, v in fields.items():
# last_feedback is JSONB: wrap native dict/list so psycopg3 adapts it.
# A pre-encoded JSON string is left as-is (server casts text -> jsonb).
if k == "last_feedback" and v is not None and not isinstance(v, str):
v = json.dumps(v)
v = Jsonb(v)
sets.append(f"{k} = %s")
params.append(v)
params.append(work_id)
@@ -192,5 +193,5 @@ def record_cost(cur, work_id: str, project: str, phase: str, model: str,
def emit_event(cur, work_id: str, kind: str, payload: dict) -> None:
cur.execute(
"INSERT INTO events_outbox (work_item_id, kind, payload) VALUES (%s,%s,%s)",
(work_id, kind, json.dumps(payload)),
)
(work_id, kind, Jsonb(payload)),
)

48
src/damascus/tasks.py Normal file
View File

@@ -0,0 +1,48 @@
"""Taskiq wiring — the BullMQ-equivalent queue (design doc §13).
Taskiq replaces cron as the recurring trigger. The per-tick model is
unchanged: `run_cycle` calls `cycle.tick()`, which claims one work item via
`SELECT ... FOR UPDATE SKIP LOCKED` and runs one phase. The global
concurrency cap (design doc §10) is the Taskiq worker's
`--max-threadpool-threads N` (sync tasks run in a threadpool), where
N = settings.max_concurrent. SKIP LOCKED makes N parallel ticks safe.
Launch:
taskiq worker damascus.tasks:broker --max-threadpool-threads $N # consumes + runs
taskiq scheduler damascus.tasks:scheduler # enqueues on the cron
`damascus cycle` (CLI) remains the deterministic one-shot path for operators
and the E2E suite — it calls `cycle.tick()` directly, bypassing Taskiq.
"""
from __future__ import annotations
from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_redis import ListQueueBroker
from .config import settings
# Redis is the broker transport. ListQueueBroker is a list-backed FIFO queue:
# the worker does an indefinite BRPOP on the queue key. redis-py 8.x defaults
# socket_timeout=5s, which would fire while the queue is idle and raise a
# TimeoutError that taskiq's listen() loop does NOT catch (TimeoutError is a
# sibling of ConnectionError, not a subclass) — killing the worker process.
# socket_timeout=None restores the indefinite blocking read BRPOP needs.
# socket_connect_timeout makes startup fail fast if Redis is unreachable.
broker = ListQueueBroker(
url=settings.redis_url,
socket_connect_timeout=10,
socket_timeout=None,
)
# LabelScheduleSource reads the `schedule=[...]` label off tasks below and
# enqueues them on the cron. Run exactly one scheduler process.
scheduler = TaskiqScheduler(broker=broker, sources=[LabelScheduleSource(broker)])
@broker.task(schedule=[{"cron": "* * * * *"}])
def run_cycle() -> None:
"""One orchestrator tick. Sync — Taskiq runs it in a threadpool, so the
blocking subprocess/httpx calls in the phase functions work unchanged."""
from . import cycle
cycle.tick()

View File

@@ -1,13 +1,13 @@
"""
Damascus Orchestrator E2E test configuration.
Tests run against the REAL MySQL state in the docker-compose stack.
Tests run against the REAL Postgres state in the docker-compose stack.
Each test resets the work_items table before running, then exercises
one phase transition and asserts on the resulting row state.
Why real DB, not mocks:
- The contract is about the row state, the events_outbox, the cost_ledger
- Mocking MySQL would let us test our mocks, not the system
- Mocking Postgres would let us test our mocks, not the system
- The orchestrator's value is in the row-level atomicity, not the Python code
Test isolation: every test calls reset_state() in a fixture, which:
@@ -18,33 +18,33 @@ Test isolation: every test calls reset_state() in a fixture, which:
import os
import subprocess
import time
import uuid
from pathlib import Path
import psycopg
import pytest
import pymysql
from psycopg.rows import dict_row
DAMASCUS_ROOT = Path("/root/damascus-orchestrator")
WIKI_ROOT = DAMASCUS_ROOT / "wiki"
SPECS_DIR = DAMASCUS_ROOT / "specs" / "wh40k-pc"
# Real MySQL connection (matches docker-compose env)
# When running from the HOST, use 127.0.0.1:3307 (the host-bound port).
# When running from INSIDE the orchestrator container, use db:3306 (compose service name).
# Real Postgres connection (matches docker-compose env)
# When running from the HOST, use 127.0.0.1:5432 (the host-bound port).
# When running from INSIDE the orchestrator container, use db:5432 (compose service name).
DB_CONFIG = dict(
host=os.environ.get("DAMASCUS_MYSQL_HOST", "127.0.0.1"),
port=int(os.environ.get("DAMASCUS_MYSQL_PORT", "3307")),
user=os.environ.get("DAMASCUS_MYSQL_USER", "damascus"),
password=os.environ.get("DAMASCUS_MYSQL_PASSWORD", "damascus"),
database=os.environ.get("DAMASCUS_MYSQL_DB", "damascus"),
host=os.environ.get("DAMASCUS_PG_HOST", "127.0.0.1"),
port=int(os.environ.get("DAMASCUS_PG_PORT", "5432")),
user=os.environ.get("DAMASCUS_PG_USER", "damascus"),
password=os.environ.get("DAMASCUS_PG_PASSWORD", "damascus"),
dbname=os.environ.get("DAMASCUS_PG_DB", "damascus"),
autocommit=False,
)
def get_conn():
return pymysql.connect(**DB_CONFIG)
return psycopg.connect(**DB_CONFIG, row_factory=dict_row)
def run_cycle_in_container():
@@ -58,19 +58,21 @@ def run_cycle_in_container():
def reset_state():
"""Truncate all tables. Called by fixtures before each test."""
"""Truncate all tables and restart sequences. Called by fixtures before each test."""
conn = get_conn()
try:
with conn.cursor() as cur:
for table in ("work_items", "human_issues", "cost_ledger", "events_outbox", "coordination_gates"):
cur.execute(f"DELETE FROM {table}")
cur.execute(
"TRUNCATE work_items, human_issues, cost_ledger, events_outbox, "
"coordination_gates RESTART IDENTITY CASCADE"
)
conn.commit()
finally:
conn.close()
def insert_work_item(phase="spec", story_id=None, title="Test story",
file_scope=None, budget_cycles=5):
file_scope=None, budget_cycles=5, project="wh40k-pc"):
"""Insert a single work_item. Returns the row id."""
conn = get_conn()
try:
@@ -79,8 +81,8 @@ def insert_work_item(phase="spec", story_id=None, title="Test story",
cur.execute(
"""INSERT INTO work_items
(id, project, story_id, title, phase, file_scope, budget_cycles, priority)
VALUES (%s, 'wh40k-pc', %s, %s, %s, %s, %s, 100)""",
(row_id, story_id or f"test-{uuid.uuid4().hex[:8]}", title, phase,
VALUES (%s, %s, %s, %s, %s, %s, %s, 100)""",
(row_id, project, story_id or f"test-{uuid.uuid4().hex[:8]}", title, phase,
file_scope or '["src/test.js"]', budget_cycles),
)
conn.commit()
@@ -93,7 +95,7 @@ def get_row(row_id):
"""Fetch the work_items row by id."""
conn = get_conn()
try:
with conn.cursor(pymysql.cursors.DictCursor) as cur:
with conn.cursor() as cur:
cur.execute("SELECT * FROM work_items WHERE id = %s", (row_id,))
return cur.fetchone()
finally:
@@ -104,7 +106,7 @@ def get_events(row_id):
"""Fetch all events_outbox rows for a work_item, ordered by id."""
conn = get_conn()
try:
with conn.cursor(pymysql.cursors.DictCursor) as cur:
with conn.cursor() as cur:
cur.execute(
"SELECT * FROM events_outbox WHERE work_item_id = %s ORDER BY id",
(row_id,),
@@ -117,7 +119,7 @@ def get_events(row_id):
def get_cost_rows(row_id):
conn = get_conn()
try:
with conn.cursor(pymysql.cursors.DictCursor) as cur:
with conn.cursor() as cur:
cur.execute(
"SELECT * FROM cost_ledger WHERE work_item_id = %s ORDER BY id",
(row_id,),

View File

@@ -8,47 +8,66 @@ that the contract documents promise.
If a contract changes, these tests will fail. Update them deliberately.
"""
import os
from pathlib import Path
import pymysql
import psycopg
import pytest
from psycopg.rows import tuple_row
ORCH_ROOT = Path("/root/damascus-orchestrator")
# In-container E2E runs against /root/damascus-orchestrator; CI (host) sets
# DAMASCUS_ROOT to the checked-out workspace so source-read tests resolve.
ORCH_ROOT = Path(os.environ.get("DAMASCUS_ROOT", "/root/damascus-orchestrator"))
SRC = ORCH_ROOT / "src" / "damascus"
PG_CONFIG = dict(
host=os.environ.get("DAMASCUS_PG_HOST", "127.0.0.1"),
port=int(os.environ.get("DAMASCUS_PG_PORT", "5432")),
user=os.environ.get("DAMASCUS_PG_USER", "damascus"),
password=os.environ.get("DAMASCUS_PG_PASSWORD", "damascus"),
dbname=os.environ.get("DAMASCUS_PG_DB", "damascus"),
autocommit=True,
)
def test_src_files_exist():
"""The orchestrator code has the modules the contracts reference."""
for module in ["cycle", "phases", "state", "git_ops", "llm", "cli", "relay", "wiki"]:
for module in ["cycle", "phases", "state", "git_ops", "llm", "cli", "relay",
"wiki", "tasks"]:
assert (SRC / f"{module}.py").exists(), f"missing {module}.py"
@pytest.mark.db
def test_schema_has_required_tables():
"""MySQL has all 5 tables the contracts reference."""
conn = pymysql.connect(
host="127.0.0.1", port=3307, user="damascus", password="damascus",
database="damascus", autocommit=True,
)
with conn.cursor() as cur:
cur.execute("SHOW TABLES")
tables = {row[0] for row in cur.fetchall()}
conn.close()
"""Postgres has all 5 tables the contracts reference."""
conn = psycopg.connect(**PG_CONFIG)
try:
with conn.cursor(row_factory=tuple_row) as cur:
cur.execute(
"SELECT tablename FROM pg_tables WHERE schemaname = 'public'"
)
tables = {row[0] for row in cur.fetchall()}
finally:
conn.close()
required = {"work_items", "coordination_gates", "human_issues",
"cost_ledger", "events_outbox"}
assert required.issubset(tables), f"missing tables: {required - tables}"
@pytest.mark.db
def test_workitems_has_required_columns():
"""The work_items table has every column the contracts reference."""
conn = pymysql.connect(
host="127.0.0.1", port=3307, user="damascus", password="damascus",
database="damascus", autocommit=True,
)
with conn.cursor() as cur:
cur.execute("DESCRIBE work_items")
cols = {row[0] for row in cur.fetchall()}
conn.close()
conn = psycopg.connect(**PG_CONFIG)
try:
with conn.cursor(row_factory=tuple_row) as cur:
cur.execute(
"SELECT column_name FROM information_schema.columns "
"WHERE table_schema = 'public' AND table_name = 'work_items'"
)
cols = {row[0] for row in cur.fetchall()}
finally:
conn.close()
required = {
"id", "project", "story_id", "title", "phase", "file_scope",
"attempts", "budget_cycles", "base_commit", "branch", "pr_url",
@@ -78,10 +97,27 @@ def test_typed_verdicts_in_phases():
"""The phase transitions emit the typed verdicts the design doc promises."""
phases_py = (SRC / "phases.py").read_text()
for verdict in ["pass", "tests_failed", "rebase_conflict",
"spec_ambiguous", "no_pr"]:
"spec_ambiguous", "spec_wrong", "no_pr"]:
assert verdict in phases_py, f"phases.py missing verdict {verdict}"
def test_loop_breaker_routes_to_blocked():
"""The cycle routes a budget-exhausted non-pass verdict to 'blocked' (§5/§16)."""
cycle_py = (SRC / "cycle.py").read_text()
assert "blocked" in cycle_py, "cycle.py must implement the loop-breaker -> blocked"
assert "attempts" in cycle_py and "budget_cycles" in cycle_py, (
"cycle.py must compare attempts against budget_cycles for the loop-breaker"
)
def test_taskiq_wiring_present():
"""tasks.py wires Taskiq as the BullMQ-equivalent queue (design doc §13)."""
tasks_py = (SRC / "tasks.py").read_text()
assert "ListQueueBroker" in tasks_py, "tasks.py must define a Taskiq broker"
assert "TaskiqScheduler" in tasks_py
assert "run_cycle" in tasks_py
def test_wiki_module_wires_to_mounted_dir():
"""The wiki module references the bind-mounted wiki path (via config)."""
config_py = (SRC / "config.py").read_text()

View File

@@ -79,20 +79,32 @@ def test_loop_breaker_at_budget():
assert apply_verdict("pass", 5, 5) == "merged"
@pytest.mark.db
def test_budget_decay_shape_configurable():
"""Design doc §14 open question: halving vs fixed cap. The current
implementation uses a fixed cap (the budget_cycles column).
This test asserts the schema has a budget_cycles column on work_items.
"""
import pymysql
conn = pymysql.connect(
host="127.0.0.1", port=3307, user="damascus", password="damascus",
database="damascus", autocommit=True,
import os
import psycopg
from psycopg.rows import tuple_row
conn = psycopg.connect(
host=os.environ.get("DAMASCUS_PG_HOST", "127.0.0.1"),
port=int(os.environ.get("DAMASCUS_PG_PORT", "5432")),
user=os.environ.get("DAMASCUS_PG_USER", "damascus"),
password=os.environ.get("DAMASCUS_PG_PASSWORD", "damascus"),
dbname=os.environ.get("DAMASCUS_PG_DB", "damascus"),
autocommit=True,
)
with conn.cursor() as cur:
cur.execute("DESCRIBE work_items")
cols = {row[0] for row in cur.fetchall()}
conn.close()
try:
with conn.cursor(row_factory=tuple_row) as cur:
cur.execute(
"SELECT column_name FROM information_schema.columns "
"WHERE table_schema = 'public' AND table_name = 'work_items'"
)
cols = {row[0] for row in cur.fetchall()}
finally:
conn.close()
assert "budget_cycles" in cols, "work_items must have budget_cycles column"
assert "attempts" in cols, "work_items must have attempts column"