This repository has been archived on 2026-05-23. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
damascus-frontier/ceo.py
root 60e870e817 fix: ThreadManager, Sales _needs_clarification, ceo queue command
Final integration fixes for Tier 1-4 merge:
- Added ThreadManager class to conversation.py
- Added _needs_clarification + _ask_clarification to Sales agent
- Added ./ceo.py queue command showing agent availability
2026-05-21 16:04:05 +00:00

902 lines
33 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Damascus Frontier — CEO CLI Interface.
Kay (the human CEO) uses this to direct the startup swarm.
Messages go to Redis → VP picks them up → orchestrates the team.
Usage:
./ceo.py dashboard # Show all projects with progress
./ceo.py project <id> # Show full project detail
./ceo.py approve <project_id> # Approve a project for building
./ceo.py kill <project_id> # Kill a project
./ceo.py watch # Live-updating dashboard (Ctrl+C to exit)
./ceo.py status # Show team status
./ceo.py reports # List generated reports
./ceo.py read <filename> # Read a specific report
./ceo.py listen # Listen for live team updates
./ceo.py build <project_name> # Send build directive
./ceo.py unblock <thread_id> # Unblock a stuck thread
./ceo.py "message" # Send free-form directive
"""
import os
import sys
import json
import time
import argparse
import threading
import redis
import yaml
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional, List, Dict
PROJECT_ROOT = Path(__file__).parent
SHARED_DIR = PROJECT_ROOT / "shared" / "reports"
# Phase → progress fraction (out of 6)
PHASE_PROGRESS = {
"ideation": 1 / 6,
"validation": 2 / 6,
"validate": 2 / 6,
"approved": 3 / 6,
"building": 4 / 6,
"build": 4 / 6,
"ship": 6 / 6,
"shipped": 6 / 6,
"killed": 0.0,
}
QUERY_CHANNEL = "damascus:query"
QUERY_RESPONSE_CHANNEL = "damascus:query:response"
QUERY_TIMEOUT = 5 # seconds to wait for VP response
# ═══════════════════════════════════════════════════════════════
# Helpers
# ═══════════════════════════════════════════════════════════════
def load_config() -> dict:
"""Load config with env var substitution."""
config_path = PROJECT_ROOT / "config.yaml"
if not config_path.exists():
return {}
raw = config_path.read_text()
for key in ["LITELLM_URL", "LITELLM_MODEL", "GITEA_URL", "GITEA_TOKEN", "GITEA_USER"]:
raw = raw.replace(f"${{{key}}}", os.environ.get(key, ""))
return yaml.safe_load(raw) or {}
def get_redis(config: dict) -> redis.Redis:
"""Connect to Redis."""
redis_cfg = config.get("redis", {})
r = redis.Redis(
host=redis_cfg.get("host", "redis"),
port=redis_cfg.get("port", 6379),
decode_responses=True,
)
r.ping()
return r
def get_channel(config: dict, target: str) -> str:
"""Get Redis channel for a role."""
channels = config.get("redis", {}).get("channels", {})
return channels.get(target, f"damascus:{target}")
def send_directive(r: redis.Redis, channel: str, message: str, directive_type: str = "directive",
extra_context: dict = None):
"""Send a CEO directive to a channel."""
msg = {
"id": f"ceo-{int(time.time())}",
"from": "ceo",
"to": "vp",
"type": directive_type,
"parent": None,
"content": message,
"context": {
"ceo": "Kay Kayyali",
"timestamp": datetime.now(timezone.utc).isoformat(),
**(extra_context or {}),
},
"timestamp": datetime.now(timezone.utc).isoformat(),
}
r.publish(channel, json.dumps(msg))
def make_progress_bar(phase: str) -> str:
"""Build a 7-segment progress bar based on project phase."""
fraction = PHASE_PROGRESS.get(phase.lower(), 0.0)
filled = round(fraction * 7)
filled = max(0, min(7, filled))
bar = "\u2588" * filled + "\u2591" * (7 - filled)
return bar
def human_ago(ts: float) -> str:
"""Convert a Unix timestamp into a human 'ago' string."""
diff = time.time() - ts
if diff < 60:
return f"{int(diff)}s ago"
elif diff < 3600:
return f"{int(diff // 60)}m ago"
elif diff < 86400:
h = int(diff // 3600)
m = int((diff % 3600) // 60)
return f"{h}h {m}m ago"
else:
return f"{int(diff // 86400)}d ago"
def parse_project_name_from_path(path: Path) -> str:
"""Extract a project name from a report file path."""
# Try to derive from filename like prd-docu-scan.md → docu-scan
name = path.stem
for prefix in ["prd-", "gtm-", "build-", "idea-"]:
if name.startswith(prefix):
name = name[len(prefix):]
break
return name
# ═══════════════════════════════════════════════════════════════
# Redis Query Protocol
# ═══════════════════════════════════════════════════════════════
def redis_query(config: dict, query: dict, timeout: float = QUERY_TIMEOUT) -> Optional[dict]:
"""Publish a query to the VP via damascus:query and wait for response.
Returns the parsed JSON response or None on timeout.
"""
redis_cfg = config.get("redis", {})
try:
r = redis.Redis(
host=redis_cfg.get("host", "redis"),
port=redis_cfg.get("port", 6379),
decode_responses=True,
)
r.ping()
except Exception:
return None
response_data = None
response_event = threading.Event()
# Subscribe first, then publish
sub = r.pubsub()
sub.subscribe(QUERY_RESPONSE_CHANNEL)
def _listen():
nonlocal response_data
for raw in sub.listen():
if response_event.is_set():
break
if raw["type"] != "message":
continue
try:
data = json.loads(raw["data"])
# Only accept responses that match our query ID
if data.get("query_id") == query.get("query_id"):
response_data = data
response_event.set()
except (json.JSONDecodeError, KeyError):
pass
listener = threading.Thread(target=_listen, daemon=True)
listener.start()
# Small delay to ensure subscriber is ready
time.sleep(0.1)
try:
r.publish(QUERY_CHANNEL, json.dumps(query))
except Exception:
sub.close()
return None
# Wait for response
responded = response_event.wait(timeout=timeout)
sub.close()
if responded and response_data:
return response_data
return None
def query_vp_list_projects(config: dict) -> Optional[List[dict]]:
"""Ask VP for all projects."""
query_id = f"ceo-query-{int(time.time())}"
result = redis_query(config, {
"type": "list_projects",
"query_id": query_id,
"from": "ceo",
"timestamp": datetime.now(timezone.utc).isoformat(),
})
if result and result.get("status") == "ok":
return result.get("projects", [])
return None
def query_vp_get_project(config: dict, project_id: str) -> Optional[dict]:
"""Ask VP for a specific project."""
query_id = f"ceo-query-{int(time.time())}"
result = redis_query(config, {
"type": "get_project",
"query_id": query_id,
"from": "ceo",
"project_id": project_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
})
if result and result.get("status") == "ok":
return result.get("project", None)
return None
def send_vp_directive(config: dict, directive: dict):
"""Send a directive to VP via the query channel."""
try:
redis_cfg = config.get("redis", {})
r = redis.Redis(
host=redis_cfg.get("host", "redis"),
port=redis_cfg.get("port", 6379),
decode_responses=True,
)
r.ping()
r.publish(QUERY_CHANNEL, json.dumps(directive))
except Exception:
pass
# ═══════════════════════════════════════════════════════════════
# Filesystem fallback — scan reports/ directory
# ═══════════════════════════════════════════════════════════════
def _scan_reports_for_projects() -> List[dict]:
"""Fallback: derive project list from files in shared/reports/."""
projects: Dict[str, dict] = {}
if not SHARED_DIR.exists():
return []
for md_file in SHARED_DIR.rglob("*.md"):
project_name = parse_project_name_from_path(md_file)
if not project_name or project_name.startswith("failure-") or project_name.startswith("idea-generation-"):
# Skip failure reports and idea-generation meta files
if project_name.startswith("failure-"):
continue
if project_name not in projects:
projects[project_name] = {
"id": str(len(projects) + 1),
"name": project_name,
"phase": "ideation",
"last_update": md_file.stat().st_mtime,
"files": [],
"description": "",
"ceo_directive": "",
"decisions": [],
"deliverables": [],
"gitea_repo": None,
"report_paths": [],
}
cat = md_file.parent.name
projects[project_name]["files"].append(md_file)
projects[project_name]["report_paths"].append(str(md_file.relative_to(PROJECT_ROOT)))
# Infer phase from file category (max 8 chars for dashboard)
cat_phase_map = {
"ideas": "ideation",
"prds": "validate",
"gtm": "approved",
"builds": "building",
"code": "shipped",
}
inferred_phase = cat_phase_map.get(cat, "ideation")
current_phase_rank = list(PHASE_PROGRESS.keys()).index(projects[project_name]["phase"]) if projects[project_name]["phase"] in PHASE_PROGRESS else 0
new_phase_rank = list(PHASE_PROGRESS.keys()).index(inferred_phase) if inferred_phase in PHASE_PROGRESS else 0
if new_phase_rank > current_phase_rank:
projects[project_name]["phase"] = inferred_phase
# Update last_update
mtime = md_file.stat().st_mtime
if mtime > projects[project_name]["last_update"]:
projects[project_name]["last_update"] = mtime
# Sort by ID
return sorted(projects.values(), key=lambda p: int(p["id"]))
# ═══════════════════════════════════════════════════════════════
# Dashboard rendering
# ═══════════════════════════════════════════════════════════════
def render_dashboard(projects: List[dict], title: str = "DAMASCUS FRONTIER \u2014 Projects") -> str:
"""Render the full dashboard table."""
lines = []
width = 70
padding = max(width - len(title) - 2, 0) // 2
lines.append("\u2554" + "\u2550" * width + "\u2557")
lines.append("\u2551" + " " * padding + title + " " * (width - len(title) - padding) + "\u2551")
lines.append("\u2560\u2550\u2550\u2550\u2550\u2550\u2550\u2564\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2564\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2564\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2564\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2563")
lines.append("\u2551 ID \u2502 Name \u2502 Phase \u2502 Progress \u2502 Last Update \u2551")
lines.append("\u2560\u2550\u2550\u2550\u2550\u2550\u2550\u256a\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u256a\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u256a\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u256a\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2563")
if not projects:
lines.append("\u2551 No projects found yet. \u2551")
else:
for proj in projects:
pid = proj.get("id", "?")
name = proj.get("name", "unknown")[:16]
phase = proj.get("phase", "ideation")[:8]
bar = make_progress_bar(phase)
last_ts = proj.get("last_update", time.time())
last_str = human_ago(last_ts)
# Pad fields
pid_str = str(pid).rjust(4)
name_str = name.ljust(16)
phase_str = phase.ljust(8)
bar_str = bar.ljust(7)
bar_str += " "
last_str = last_str.ljust(22)
lines.append(
f"\u2551 {pid_str} \u2502 {name_str} \u2502 {phase_str} \u2502 {bar_str} \u2502 {last_str} \u2551"
)
lines.append("\u255a\u2550\u2550\u2550\u2550\u2550\u2550\u2567\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2567\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2567\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2567\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u255d")
return "\n".join(lines)
# ═══════════════════════════════════════════════════════════════
# Command implementations
# ═══════════════════════════════════════════════════════════════
def cmd_dashboard(config: dict):
"""Show all projects in a dashboard table."""
# Try Redis query first
projects = query_vp_list_projects(config)
# Fallback to filesystem scan
if projects is None:
projects = _scan_reports_for_projects()
print()
print(render_dashboard(projects))
print()
def cmd_project(config: dict, project_id: str):
"""Show full project detail."""
project = None
# Try Redis query first
redis_project = query_vp_get_project(config, project_id)
if redis_project:
project = redis_project
else:
# Fallback: scan reports for this project by ID or name
all_projects = _scan_reports_for_projects()
for p in all_projects:
if p["id"] == project_id or p["name"] == project_id:
project = p
break
if not project:
print(f"\n\u274c Project '{project_id}' not found.\n")
return
print()
print("\u2554" + "\u2550" * 60 + "\u2557")
print(f"\u2551 Project: {project['name'][:46].ljust(46)} \u2551")
print("\u2560" + "\u2550" * 60 + "\u2563")
# Basic info
phase = project.get("phase", "unknown")
description = project.get("description", "No description available.")
print(f"\u2551 Phase: {phase.ljust(48)} \u2551")
pct = int(PHASE_PROGRESS.get(phase, 0) * 100)
print(f"\u2551 Progress: {make_progress_bar(phase)} ({pct}%)\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \u2551")
print(f"\u2551 Last Update: {human_ago(project.get('last_update', time.time())).ljust(44)} \u2551")
print("\u2560" + "\u2550" * 60 + "\u2563")
# Description
print(f"\u2551 Description: {' ' * 45} \u2551")
for line in _wrap_text(description, 56):
print(f"\u2551 {line.ljust(54)} \u2551")
print("\u2560" + "\u2550" * 60 + "\u2563")
# CEO Directive
ceo_dir = project.get("ceo_directive", "")
if ceo_dir:
print(f"\u2551 \U0001f4cb CEO Directive: {' ' * 41} \u2551")
for line in _wrap_text(ceo_dir, 56):
print(f"\u2551 {line.ljust(54)} \u2551")
print("\u2560" + "\u2550" * 60 + "\u2563")
# Timeline of decisions
decisions = project.get("decisions", [])
if decisions:
print(f"\u2551 \U0001f550 Decision Timeline: {' ' * 38} \u2551")
for dec in decisions:
ts = dec.get("timestamp", "")[:16].replace("T", " ")
d = dec.get("decision", "")
print(f"\u2551 [{ts}] {d[:48].ljust(48)} \u2551")
print("\u2560" + "\u2550" * 60 + "\u2563")
# Deliverables
deliverables = project.get("deliverables", [])
if deliverables:
print(f"\u2551 \U0001f4e6 Deliverables: {' ' * 42} \u2551")
for d in deliverables:
name = d.get("name", "unknown")
status = d.get("status", "pending")
icon = {"complete": "\u2705", "in_progress": "\U0001f535", "pending": "\u23f3"}.get(status, "\u23f3")
print(f"\u2551 {icon} {name[:50].ljust(50)} \u2551")
print("\u2560" + "\u2550" * 60 + "\u2563")
# Gitea repo
gitea_url = project.get("gitea_repo")
if gitea_url:
print(f"\u2551 \U0001f517 Gitea Repo: {gitea_url[:47].ljust(47)} \u2551")
print("\u2560" + "\u2550" * 60 + "\u2563")
# Report file paths
report_paths = project.get("report_paths", [])
if report_paths:
print(f"\u2551 \U0001f4c4 Report Files: {' ' * 42} \u2551")
for rp in report_paths[:10]:
print(f"\u2551 \u2022 {rp[:53].ljust(53)} \u2551")
print("\u2560" + "\u2550" * 60 + "\u2563")
print("\u255a" + "\u2550" * 60 + "\u255d")
print()
def _wrap_text(text: str, width: int) -> List[str]:
"""Wrap text to a given width."""
words = text.split()
lines = []
current = ""
for word in words:
if len(current) + len(word) + 1 > width:
if current:
lines.append(current)
current = word
else:
current = f"{current} {word}".strip()
if current:
lines.append(current)
return lines if lines else [text[:width]]
def cmd_approve(config: dict, project_id: str):
"""CEO explicitly approves a project for build."""
print(f"\n\U0001f528 Approving project '{project_id}' for build...")
# Send BUILD directive via Redis query channel
query_id = f"ceo-approve-{int(time.time())}"
directive = {
"type": "approve",
"query_id": query_id,
"from": "ceo",
"project_id": project_id,
"action": "BUILD",
"timestamp": datetime.now(timezone.utc).isoformat(),
}
send_vp_directive(config, directive)
# Also send via the CEO channel for direct VP pick-up
try:
r = get_redis(config)
ceo_channel = get_channel(config, "ceo")
message = (
f"BUILD: {project_id} — CEO approved. "
f"Read the PRD and GTM plan, create the Gitea repo, and build the MVP."
)
send_directive(r, ceo_channel, message, directive_type="build_approval",
extra_context={"project_id": project_id, "phase": "approved"})
print(f" \u2705 BUILD directive sent for '{project_id}'")
print(f" VP will coordinate PM \u2192 Sales \u2192 Engineer workflow.\n")
except Exception as e:
print(f" \u26a0\ufe0f Redis unavailable, but directive queued: {e}\n")
def cmd_queue(config: dict):
"""Show agent availability and queued tasks."""
print("\n" + "\u2554" + "\u2550" * 52 + "\u2557")
print("\u2551 DAMASCUS FRONTIER \u2014 Queue \u2551")
print("\u2560" + "\u2550" * 52 + "\u2563")
# Try Redis query for agent status
try:
redis_cfg = config.get("redis", {})
r = redis.Redis(host=redis_cfg.get("host", "redis"), port=redis_cfg.get("port", 6379), decode_responses=True)
r.ping()
# Query VP for queue status
response = redis_query(config, {"type": "list_queue", "query_id": f"ceo-queue-{int(time.time())}"})
if response and response.get("status") == "ok":
agents = response.get("agents", {})
for agent in ["pm", "sales", "engineer"]:
status = agents.get(agent, "unknown")
icon = "\U0001f7e2" if status == "idle" else "\U0001f534" if status == "busy" else "\u26aa"
print(f"\u2551 {agent.upper():12s} {icon} {status.ljust(32)} \u2551")
tasks = response.get("queued_tasks", [])
if tasks:
print("\u2560" + "\u2550" * 52 + "\u2563")
print("\u2551 Queued: \u2551")
for t in tasks[:10]:
print(f"\u2551 \u2022 {t.get('project_id', '?')} \u2192 {t.get('agent', '?')} (pri={t.get('priority', 0)}) \u2551")
print("\u255a" + "\u2550" * 52 + "\u255d\n")
return
except Exception:
pass
# Fallback
print("\u2551 (Redis unavailable — cannot query queue) \u2551")
print("\u255a" + "\u2550" * 52 + "\u255d\n")
def cmd_kill(config: dict, project_id: str):
"""CEO kills a project."""
print(f"\n\U0001f480 Killing project '{project_id}'...")
# Send KILL directive via Redis query channel
query_id = f"ceo-kill-{int(time.time())}"
directive = {
"type": "kill",
"query_id": query_id,
"from": "ceo",
"project_id": project_id,
"action": "KILL",
"timestamp": datetime.now(timezone.utc).isoformat(),
}
send_vp_directive(config, directive)
# Also send via CEO channel
try:
r = get_redis(config)
ceo_channel = get_channel(config, "ceo")
message = f"KILL: {project_id} — CEO terminated this project. Archive all deliverables."
send_directive(r, ceo_channel, message, directive_type="kill_order",
extra_context={"project_id": project_id})
print(f" \u2705 KILL directive sent for '{project_id}'")
print(f" VP will archive this project.\n")
except Exception as e:
print(f" \u26a0\ufe0f Redis unavailable, but directive queued: {e}\n")
def cmd_watch(config: dict):
"""Live-updating dashboard that refreshes every 5s."""
import signal
print("\n\U0001f4fa Live dashboard — refreshing every 5s (Ctrl+C to exit)...\n")
running = True
def on_sigint(sig, frame):
nonlocal running
running = False
original_sigint = signal.signal(signal.SIGINT, on_sigint)
# Hide cursor
sys.stdout.write("\033[?25l")
sys.stdout.flush()
try:
while running:
# Get projects
projects = query_vp_list_projects(config)
if projects is None:
projects = _scan_reports_for_projects()
dashboard = render_dashboard(projects)
# Move cursor to top, clear below
sys.stdout.write("\033[H\033[J")
sys.stdout.write(dashboard)
sys.stdout.write(f"\n\n Last refresh: {datetime.now().strftime('%H:%M:%S')} (Ctrl+C to exit)")
sys.stdout.flush()
# Wait 5s but check running flag every 200ms
for _ in range(25):
if not running:
break
time.sleep(0.2)
finally:
# Show cursor
sys.stdout.write("\033[?25h")
sys.stdout.flush()
signal.signal(signal.SIGINT, original_sigint)
print("\n\U0001f44b Dashboard closed.\n")
def cmd_status(config: dict):
"""Show the team's current status."""
reports_dir = SHARED_DIR
if not reports_dir.exists():
print("\n\U0001f4ca No reports yet. The team hasn't produced anything.")
print(" Try: ./ceo.py \"Generate 3 business ideas\"\n")
return
print("\n\u2554" + "\u2550" * 50 + "\u2557")
print("\u2551 DAMASCUS FRONTIER — Team Status \u2551")
print("\u2560" + "\u2550" * 50 + "\u2563")
categories = ["ideas", "prds", "gtm", "builds", "research", "failures"]
for cat in categories:
cat_dir = reports_dir / cat
if cat_dir.exists():
files = sorted(cat_dir.glob("*.md"), key=lambda f: f.stat().st_mtime, reverse=True)
if files:
latest = files[0]
age = time.time() - latest.stat().st_mtime
if age < 86400:
age_str = f"{int(age // 3600)}h {int((age % 3600) // 60)}m ago"
else:
age_str = f"{int(age // 86400)}d ago"
print(f"\u2551 {cat.upper():12s} {len(files)} files \u00b7 latest: {latest.name} ({age_str})")
print("\u255a" + "\u2550" * 50 + "\u255d\n")
def cmd_reports():
"""List all generated reports."""
if not SHARED_DIR.exists():
print("No reports yet.")
return
print("\n\U0001f4da Reports:\n")
for cat_dir in sorted(SHARED_DIR.glob("*")):
if cat_dir.is_dir():
print(f" [{cat_dir.name}/]")
for report in sorted(cat_dir.glob("*.md"), key=lambda f: f.stat().st_mtime, reverse=True):
size = report.stat().st_size
print(f" {report.name} ({size:,} bytes)")
print()
def cmd_read(filename: str):
"""Read a specific report."""
found = False
if not SHARED_DIR.exists():
print(f"Report '{filename}' not found.")
return
for md in SHARED_DIR.rglob("*.md"):
if md.name == filename or filename in str(md):
print(f"\n\U0001f4c4 {md.relative_to(SHARED_DIR)}\n")
print("\u2500" * 60)
content = md.read_text()
lines = content.split("\n")
print("\n".join(lines[:100]))
if len(lines) > 100:
print(f"\n... ({len(lines) - 100} more lines)")
print("\u2500" * 60)
found = True
break
if not found:
print(f"Report '{filename}' not found.")
def cmd_listen(config: dict):
"""Listen to status channel for live updates from the team."""
redis_cfg = config.get("redis", {})
channels_cfg = config.get("redis", {}).get("channels", {})
status_channel = channels_cfg.get("status", "damascus:status")
try:
r = redis.Redis(
host=redis_cfg.get("host", "redis"),
port=redis_cfg.get("port", 6379),
decode_responses=True,
)
r.ping()
except Exception as e:
print(f"\n\u26a0\ufe0f Cannot connect to Redis: {e}\n")
return
print(f"\n\U0001f442 Listening for team updates on '{status_channel}' (Ctrl+C to stop)...\n")
sub = r.pubsub()
sub.subscribe(status_channel)
try:
for raw in sub.listen():
if raw["type"] != "message":
continue
try:
data = json.loads(raw["data"])
agent = data.get("from", "unknown")
content = data.get("content", "")
timestamp = data.get("timestamp", "")[:19].replace("T", " ")
icon = {"vp": "\U0001f3af", "pm": "\U0001f4cb", "sales": "\U0001f4b0", "engineer": "\U0001f528"}.get(agent, "\U0001f4e1")
print(f" {icon} [{timestamp}] {agent.upper()}: {content}")
except Exception:
pass
except KeyboardInterrupt:
print("\n\U0001f44b Stopped listening.")
def cmd_build(config: dict, project_name: str):
"""Trigger a build for a specific project."""
try:
r = get_redis(config)
ceo_channel = get_channel(config, "ceo")
message = f"BUILD: {project_name} — Read the PRD and GTM plan, create the Gitea repo, and build the MVP."
send_directive(r, ceo_channel, message)
print(f"\n\U0001f4e4 Build directive sent for: {project_name}")
print(" VP will coordinate the engineering team.\n")
except Exception as e:
print(f"\n\u26a0\ufe0f Cannot connect to Redis: {e}")
print(" Build directive could not be sent.\n")
def cmd_unblock(config: dict, thread_id: str, reassign_to: str = None):
"""Send an unblock command to the VP for a specific thread."""
try:
r = get_redis(config)
ceo_channel = get_channel(config, "ceo")
message = f"UNBLOCK:{thread_id}"
if reassign_to:
message += f" REASSIGN:{reassign_to}"
send_directive(r, ceo_channel, message)
print(f"\U0001f513 Unblock command sent for thread: {thread_id}")
if reassign_to:
print(f" Re-assigning to: {reassign_to}")
except Exception as e:
print(f"\n\u26a0\ufe0f Cannot connect to Redis: {e}\n")
def cmd_directive(config: dict, message: str):
"""Send a free-form CEO directive."""
try:
r = get_redis(config)
ceo_channel = get_channel(config, "ceo")
send_directive(r, ceo_channel, message)
print(f"\n\U0001f4e4 Directive sent to VP: \"{message}\"")
print(f" VP will coordinate PM \u2192 Sales \u2192 Engineer workflow.\n")
except Exception as e:
print(f"\n\u26a0\ufe0f Cannot connect to Redis: {e}")
print(" Directive could not be sent.\n")
# ═══════════════════════════════════════════════════════════════
# Main
# ═══════════════════════════════════════════════════════════════
def main():
parser = argparse.ArgumentParser(
description="Damascus Frontier — CEO CLI",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
./ceo.py dashboard
./ceo.py project 1
./ceo.py approve uptime-watch
./ceo.py kill docu-scan
./ceo.py watch
./ceo.py status
./ceo.py reports
./ceo.py read prd-001.md
./ceo.py listen
./ceo.py build docu-scan
./ceo.py "Generate 3 business ideas for SaaS tools"
""",
)
parser.add_argument("command", nargs="?", default="",
help="Command to run (dashboard, project, approve, kill, watch, "
"status, reports, read, listen, build, unblock, or free-form directive)")
parser.add_argument("args", nargs="*", default=[],
help="Arguments for the command")
parser.add_argument("--redis-host", default="redis",
help="Redis host (default: redis)")
parser.add_argument("--redis-port", type=int, default=6379,
help="Redis port (default: 6379)")
parser.add_argument("--reassign", default=None,
help="Reassign agent when unblocking (e.g., --reassign pm)")
parsed = parser.parse_args()
config = load_config()
# If no command, show help
if not parsed.command:
parser.print_help()
return
cmd = parsed.command.lower()
extra_args = parsed.args
if cmd == "queue":
cmd_queue(config)
return
# ── New dashboard commands ──
if cmd == "dashboard":
cmd_dashboard(config)
return
if cmd == "project":
pid = extra_args[0] if extra_args else ""
if not pid:
print("Usage: ./ceo.py project <id>\n")
return
cmd_project(config, pid)
return
if cmd == "approve":
pid = extra_args[0] if extra_args else ""
if not pid:
print("Usage: ./ceo.py approve <project_id>\n")
return
cmd_approve(config, pid)
return
if cmd == "kill":
pid = extra_args[0] if extra_args else ""
if not pid:
print("Usage: ./ceo.py kill <project_id>\n")
return
cmd_kill(config, pid)
return
if cmd == "watch":
cmd_watch(config)
return
# ── Existing commands ──
if cmd == "status":
cmd_status(config)
return
if cmd == "reports":
cmd_reports()
return
if cmd == "read":
fname = extra_args[0] if extra_args else ""
if not fname:
print("Usage: ./ceo.py read <filename>\n")
return
cmd_read(fname)
return
if cmd == "listen":
cmd_listen(config)
return
if cmd == "build":
project = extra_args[0] if extra_args else ""
if not project:
print("Usage: ./ceo.py build <project_name>\n")
return
cmd_build(config, project)
return
if cmd == "unblock":
tid = extra_args[0] if extra_args else ""
if not tid:
print("Usage: ./ceo.py unblock <thread_id> [--reassign agent]\n")
return
cmd_unblock(config, tid, parsed.reassign)
return
# ── Fallback: treat as free-form directive ──
full_message = f"{cmd} {' '.join(extra_args)}".strip()
cmd_directive(config, full_message)
if __name__ == "__main__":
main()