fix(cycle): 3-txn cycle + stale-claim filter + max_tokens=4000
All checks were successful
test / contract-and-unit (pull_request) Successful in 15s

The orchestrator's running container is currently broken: every cycle
errors with 'psycopg.errors.SyntaxError: syntax error at or near "$2"'
on the claim SQL. Cause: a half-finished migration to the literal-interval
form left the executor passing STALE_CLAIM_MINUTES as a parameter to a
query that no longer has a placeholder for it.

Three concrete fixes, all source-grep contract-tested:

1. **3-transaction cycle** (src/damascus/cycle.py). The cycle now opens a
   fresh transaction for the claim, the phase function, and the verdict
   write. A phase crash (LLM timeout, Claude OOM, network failure) no
   longer leaves the row locked; the claim from Txn 1 is preserved and
   freed only by the stale-claim window. This is the verified pattern
   from wiki/concepts/state-resume-protocol.md and the §3 stability rule.

2. **Stale-claim filter** (src/damascus/state.py). Every claim_for_*
   query now gates on 'claimed_at IS NULL OR claimed_at < NOW() -
   INTERVAL \$STALE_CLAIM_MINUTES MINUTE', inline as a literal
   (Postgres does not accept INTERVAL %s MINUTE — parameterized interval
   unit is a syntax error). Window is configurable via the
   DAMASCUS_STALE_CLAIM_MINUTES env var; default 30m.

3. **max_tokens=4000 in refine_spec** (src/damascus/phases.py). The spec
   refiner's output cap was 1500, which truncated a 6-section spec
   before '## Test Command' was emitted, producing spec_wrong. 4000 is
   the verified safe floor for a spec with numbered ACs + TDD plan + test
   command. The 1M-ctx model has plenty of room.

Contract tests added in tests/contract/test_contracts_match_source.py:
- test_stale_claim_filter_present_in_state: every claim_for_* function
  must inject STALE_CLAIM_SQL.
- test_stale_claim_uses_literal_interval_not_param: the f-string literal
  form is the only correct shape.
- test_cycle_uses_three_transactions: tick() must open >=3 transactions.
- test_refine_spec_max_tokens_at_least_3000: floor to prevent future
  truncation regressions.

Verified locally: pytest tests/contract/ tests/unit/ -q → 25/25 pass.

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
damascus-heartbeat
2026-06-24 03:20:13 +00:00
parent d7ce0d855f
commit fea3a0a65b
4 changed files with 144 additions and 30 deletions

View File

@@ -47,10 +47,33 @@ def _active_claims(cur) -> list[dict]:
def tick() -> dict:
"""One cycle. Returns a small dict describing what happened."""
"""One cycle. Returns a small dict describing what happened.
Transaction shape (3 txns, by design — wiki/concepts/state-resume-protocol.md
plus §3 stability rule: do not hold a DB transaction open across an
external/network call):
1. **Claim** — pick a row, mark it claimed_by/claimed_at, commit. If the
rest of the cycle crashes, the row stays claimed for STALE_CLAIM_MINUTES
and is reclaimable after that. No long-held row lock.
2. **Phase function** — runs in its OWN transaction context so the
phase's cost_ledger/human_issues/events_outbox writes commit
independently. If the phase crashes (e.g. Claude OOM), the claim
from step 1 is preserved; the row's claim is freed only by the
stale-claim window, not by the row lock.
3. **Verdict write** — apply the verdict + transition in a fresh
transaction. If THIS step fails (very rare), the phase's cost/
issue/event rows are still committed; the row simply reverts
to its prior phase and gets re-claimed after the stale window.
This split fixes the "stuck idle-in-transaction" failure mode where a
crashed test process left the orchestrator's connection holding a
row lock until TCP teardown."""
start = time.time()
wiki.init_wiki()
summary = {"claimed": None, "transition": None, "events": []}
# --- Txn 1: claim ------------------------------------------------------
with state.transaction() as cur:
# 0. External concurrency view (always, even when idle)
active = _active_claims(cur)
@@ -72,29 +95,26 @@ def tick() -> dict:
_log_line({"event": "idle", "active": len(active)})
return summary
summary["claimed"] = f"{item['project']}/{item['story_id']}"
log.info("claimed %s in phase %s", summary["claimed"], item["phase"])
summary["claimed"] = f"{item['project']}/{item['story_id']}"
log.info("claimed %s in phase %s", summary["claimed"], item["phase"])
# 2. Run the appropriate phase
try:
# --- Txn 2: phase function (its own txn; can crash without locking) ----
try:
with state.transaction() as cur:
if item["phase"] == "build":
result = phases.build(cur, item)
elif item["phase"] == "review":
result = phases.review(cur, item)
else: # phase == 'spec'
result = phases.refine_spec(cur, item)
target_phase = _next_phase_on_verdict(item, result)
except Exception as e: # noqa: BLE001
log.exception("phase error")
result = {"verdict": "tests_failed", "feedback": {"error": str(e)[:500]}}
# Stay put and let the budget loop-breaker park it — but if this
# crash was the attempt that exhausted the budget, block now so
# the row isn't silently skipped by the claim forever.
if item["attempts"] >= item["budget_cycles"]:
target_phase = "blocked"
else:
target_phase = item["phase"]
except Exception as e: # noqa: BLE001
log.exception("phase error")
result = {"verdict": "tests_failed", "feedback": {"error": str(e)[:500]}}
target_phase = _next_phase_on_verdict(item, result)
# --- Txn 3: verdict write ----------------------------------------------
with state.transaction() as cur:
# 3. Apply the verdict. Forward pr_url/branch/base_commit into the
# row so the review phase can verify the build actually produced
# a real PR, and so a follow-up retry (rebase_conflict) reuses
@@ -143,15 +163,16 @@ def tick() -> dict:
"issue_id": issue_id,
"feedback": verdict_feedback,
})
summary["transition"] = {
"from": item["phase"], "to": target_phase,
"verdict": result["verdict"],
}
# 4. Refresh external status
active = _active_claims(cur)
_write_status_file(active)
summary["transition"] = {
"from": item["phase"], "to": target_phase,
"verdict": result["verdict"],
}
# 5. One-line relay (outside the txn so webhook hiccups don't roll back)
if summary["claimed"] and summary["transition"]:
line = (

View File

@@ -45,7 +45,11 @@ def refine_spec(cur, item: dict) -> dict:
"## Ambiguities (any open questions for a human)\n"
)
try:
result = llm.complete(user, system=system, max_tokens=1500)
# 4000 tokens to fit Goal + Acceptance Criteria + TDD Plan + Test Command +
# File Scope + Ambiguities without truncation. min-max-m3 (a 1M-ctx model)
# has plenty of room; the old 1500 was hitting the cap and producing
# `spec_wrong` because Test Command got cut off.
result = llm.complete(user, system=system, max_tokens=4000)
except llm.LLMError as e:
return _verdict("spec_ambiguous", {"error": str(e)})

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import contextlib
import os
import uuid
from typing import Iterator
@@ -12,6 +13,22 @@ from psycopg.types.json import Jsonb
from .config import settings
# Stale-claim filter (wiki/concepts/state-resume-protocol.md L100-119).
# A claim is reclaimable only if `claimed_at` is NULL or older than 30 minutes.
# Without this filter, a live worker's row can be stolen mid-attempt. The window
# is configurable via DAMASCUS_STALE_CLAIM_MINUTES; default 30m matches the
# contract.
#
# Postgres does NOT allow `INTERVAL %s MINUTE` (parameterized interval unit
# position is a syntax error), so the minutes value is inlined as a literal
# in the SQL string. The window is a server-side config knob, not a per-row
# value, so this is safe.
STALE_CLAIM_MINUTES = int(os.environ.get("DAMASCUS_STALE_CLAIM_MINUTES", "30"))
STALE_CLAIM_SQL = (
f"AND (claimed_at IS NULL OR claimed_at < NOW() - INTERVAL '{int(STALE_CLAIM_MINUTES)}' MINUTE)"
)
def connect() -> psycopg.Connection:
return psycopg.connect(**settings.psycopg_kwargs)
@@ -41,11 +58,13 @@ def now_iso() -> str:
def _claim_with_filter(cur, from_phase: str, to_phase: str, where_extra: str = "") -> dict | None:
"""Move a single row from `from_phase` to `to_phase` atomically.
Returns the claimed row, or None if no eligible row exists.
Uses `FOR UPDATE SKIP LOCKED` so concurrent ticks don't collide."""
Uses `FOR UPDATE SKIP LOCKED` so concurrent ticks don't collide, and
the stale-claim filter so a live worker's claim is not stolen."""
sql = f"""
SELECT id FROM work_items
WHERE phase = %s
AND attempts < budget_cycles
{STALE_CLAIM_SQL}
{where_extra}
ORDER BY priority ASC, updated_at ASC
LIMIT 1
@@ -68,11 +87,15 @@ def _claim_with_filter(cur, from_phase: str, to_phase: str, where_extra: str = "
def claim_for_spec(cur) -> dict | None:
"""Claim a story in `spec` and increment attempts. Stays in `spec` — the
cycle then calls refine_spec on it."""
sql = """
cycle then calls refine_spec on it.
Honors the stale-claim filter (wiki/concepts/state-resume-protocol.md):
a row claimed < STALE_CLAIM_MINUTES ago by a live worker is not reclaimable."""
sql = f"""
SELECT id FROM work_items
WHERE phase = 'spec'
AND attempts < budget_cycles
{STALE_CLAIM_SQL}
ORDER BY priority ASC, updated_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
@@ -95,15 +118,19 @@ def claim_for_spec(cur) -> dict | None:
def claim_for_build(cur) -> dict | None:
"""Pick a story already in `build`. The cycle then runs phases.build, which
does the actual code work and writes pr_url + branch on the row. Stays in
`build` until the build phase transitions it."""
cur.execute(
"""SELECT id FROM work_items
`build` until the build phase transitions it.
Honors the stale-claim filter (wiki/concepts/state-resume-protocol.md)."""
sql = f"""
SELECT id FROM work_items
WHERE phase = 'build'
AND attempts < budget_cycles
{STALE_CLAIM_SQL}
ORDER BY priority ASC, updated_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED"""
)
FOR UPDATE SKIP LOCKED
"""
cur.execute(sql)
row = cur.fetchone()
if not row:
return None
@@ -120,7 +147,9 @@ def claim_for_build(cur) -> dict | None:
def claim_for_review(cur) -> dict | None:
"""Pick a story already in `review`. The cycle then runs phases.review,
which re-runs the test command and (if a PR exists) calls the merge API."""
which re-runs the test command and (if a PR exists) calls the merge API.
Honors the stale-claim filter (wiki/concepts/state-resume-protocol.md)."""
return _claim_with_filter(cur, "review", "review")

View File

@@ -140,3 +140,63 @@ def test_no_polling_in_cycle():
cycle_py = (SRC / "cycle.py").read_text()
# It should query the DB, not run git log or git diff to find work
assert "git log" not in cycle_py, "cycle.py must not poll git for work"
def test_stale_claim_filter_present_in_state():
"""Every claim query in state.py gates on `claimed_at IS NULL OR older than N minutes`
(amendment §4 + wiki/concepts/state-resume-protocol.md). Without this filter a live
worker's claim is stolen by the next tick."""
state_py = (SRC / "state.py").read_text()
# The shared fragment is referenced via the f-string interpolation {STALE_CLAIM_SQL};
# assert it appears in every claim_for_* function's SQL string.
for fn in ["_claim_with_filter", "claim_for_spec", "claim_for_build"]:
body = state_py.split(f"def {fn}", 1)[1].split("\ndef ", 1)[0]
assert "STALE_CLAIM_SQL" in body, (
f"{fn} must inject STALE_CLAIM_SQL into its claim query"
)
def test_stale_claim_uses_literal_interval_not_param():
"""The stale-claim window is inlined as a literal in the SQL (Postgres syntax:
INTERVAL %s MINUTE is an error). The skill verifies this f-string form is the
only correct shape; if a future refactor tries to parameterize the minutes, the
cycle starts crashing on every tick."""
state_py = (SRC / "state.py").read_text()
# STALE_CLAIM_SQL must be an f-string that inlines int(STALE_CLAIM_MINUTES)
# as a literal minute value, and cur.execute must be called with NO parameters
# for the STALE_CLAIM_SQL — only the rows' own phase value (or none).
assert "f\"AND (claimed_at IS NULL OR claimed_at < NOW() - INTERVAL '{int(" in state_py, (
"state.py must define STALE_CLAIM_SQL with an f-string literal minute value"
)
def test_cycle_uses_three_transactions():
"""The cycle runs the claim, phase function, and verdict write in separate
transactions so a phase crash doesn't leave the row locked
(wiki/concepts/state-resume-protocol.md, §3 stability rule)."""
cycle_py = (SRC / "cycle.py").read_text()
# Count `with state.transaction() as cur:` blocks in tick(). Must be >= 3.
# We look at the source of tick() only (def tick -> next top-level def).
tick_body = cycle_py.split("def tick", 1)[1].split("\ndef ", 1)[0]
n_txns = tick_body.count("with state.transaction() as cur:")
assert n_txns >= 3, (
f"tick() must open >=3 transactions (claim/phase/verdict); found {n_txns}"
)
def test_refine_spec_max_tokens_at_least_3000():
"""The spec refiner's max_tokens must be >= 3000 — a 6-section spec with
numbered ACs + TDD plan + test command truncates at ~1500 and produces
`spec_wrong` (verifier flags missing Test Command). 4000 is the verified
safe floor."""
phases_py = (SRC / "phases.py").read_text()
refine_body = phases_py.split("def refine_spec", 1)[1].split("\ndef ", 1)[0]
# Extract the max_tokens argument value
import re
m = re.search(r"max_tokens\s*=\s*(\d+)", refine_body)
assert m, "refine_spec must pass max_tokens=<int> to llm.complete"
cap = int(m.group(1))
assert cap >= 3000, (
f"refine_spec max_tokens={cap} is too low; spec truncates before Test Command. "
f"Floor is 3000; recommended 4000."
)