Final integration fixes for Tier 1-4 merge: - Added ThreadManager class to conversation.py - Added _needs_clarification + _ask_clarification to Sales agent - Added ./ceo.py queue command showing agent availability
902 lines
33 KiB
Python
Executable File
902 lines
33 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Damascus Frontier — CEO CLI Interface.
|
|
|
|
Kay (the human CEO) uses this to direct the startup swarm.
|
|
Messages go to Redis → VP picks them up → orchestrates the team.
|
|
|
|
Usage:
|
|
./ceo.py dashboard # Show all projects with progress
|
|
./ceo.py project <id> # Show full project detail
|
|
./ceo.py approve <project_id> # Approve a project for building
|
|
./ceo.py kill <project_id> # Kill a project
|
|
./ceo.py watch # Live-updating dashboard (Ctrl+C to exit)
|
|
./ceo.py status # Show team status
|
|
./ceo.py reports # List generated reports
|
|
./ceo.py read <filename> # Read a specific report
|
|
./ceo.py listen # Listen for live team updates
|
|
./ceo.py build <project_name> # Send build directive
|
|
./ceo.py unblock <thread_id> # Unblock a stuck thread
|
|
./ceo.py "message" # Send free-form directive
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import argparse
|
|
import threading
|
|
import redis
|
|
import yaml
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import Optional, List, Dict
|
|
|
|
|
|
PROJECT_ROOT = Path(__file__).parent
|
|
SHARED_DIR = PROJECT_ROOT / "shared" / "reports"
|
|
|
|
# Phase → progress fraction (out of 6)
|
|
PHASE_PROGRESS = {
|
|
"ideation": 1 / 6,
|
|
"validation": 2 / 6,
|
|
"validate": 2 / 6,
|
|
"approved": 3 / 6,
|
|
"building": 4 / 6,
|
|
"build": 4 / 6,
|
|
"ship": 6 / 6,
|
|
"shipped": 6 / 6,
|
|
"killed": 0.0,
|
|
}
|
|
|
|
QUERY_CHANNEL = "damascus:query"
|
|
QUERY_RESPONSE_CHANNEL = "damascus:query:response"
|
|
QUERY_TIMEOUT = 5 # seconds to wait for VP response
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Helpers
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
def load_config() -> dict:
|
|
"""Load config with env var substitution."""
|
|
config_path = PROJECT_ROOT / "config.yaml"
|
|
if not config_path.exists():
|
|
return {}
|
|
raw = config_path.read_text()
|
|
for key in ["LITELLM_URL", "LITELLM_MODEL", "GITEA_URL", "GITEA_TOKEN", "GITEA_USER"]:
|
|
raw = raw.replace(f"${{{key}}}", os.environ.get(key, ""))
|
|
return yaml.safe_load(raw) or {}
|
|
|
|
|
|
def get_redis(config: dict) -> redis.Redis:
|
|
"""Connect to Redis."""
|
|
redis_cfg = config.get("redis", {})
|
|
r = redis.Redis(
|
|
host=redis_cfg.get("host", "redis"),
|
|
port=redis_cfg.get("port", 6379),
|
|
decode_responses=True,
|
|
)
|
|
r.ping()
|
|
return r
|
|
|
|
|
|
def get_channel(config: dict, target: str) -> str:
|
|
"""Get Redis channel for a role."""
|
|
channels = config.get("redis", {}).get("channels", {})
|
|
return channels.get(target, f"damascus:{target}")
|
|
|
|
|
|
def send_directive(r: redis.Redis, channel: str, message: str, directive_type: str = "directive",
|
|
extra_context: dict = None):
|
|
"""Send a CEO directive to a channel."""
|
|
msg = {
|
|
"id": f"ceo-{int(time.time())}",
|
|
"from": "ceo",
|
|
"to": "vp",
|
|
"type": directive_type,
|
|
"parent": None,
|
|
"content": message,
|
|
"context": {
|
|
"ceo": "Kay Kayyali",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
**(extra_context or {}),
|
|
},
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
r.publish(channel, json.dumps(msg))
|
|
|
|
|
|
def make_progress_bar(phase: str) -> str:
|
|
"""Build a 7-segment progress bar based on project phase."""
|
|
fraction = PHASE_PROGRESS.get(phase.lower(), 0.0)
|
|
filled = round(fraction * 7)
|
|
filled = max(0, min(7, filled))
|
|
bar = "\u2588" * filled + "\u2591" * (7 - filled)
|
|
return bar
|
|
|
|
|
|
def human_ago(ts: float) -> str:
|
|
"""Convert a Unix timestamp into a human 'ago' string."""
|
|
diff = time.time() - ts
|
|
if diff < 60:
|
|
return f"{int(diff)}s ago"
|
|
elif diff < 3600:
|
|
return f"{int(diff // 60)}m ago"
|
|
elif diff < 86400:
|
|
h = int(diff // 3600)
|
|
m = int((diff % 3600) // 60)
|
|
return f"{h}h {m}m ago"
|
|
else:
|
|
return f"{int(diff // 86400)}d ago"
|
|
|
|
|
|
def parse_project_name_from_path(path: Path) -> str:
|
|
"""Extract a project name from a report file path."""
|
|
# Try to derive from filename like prd-docu-scan.md → docu-scan
|
|
name = path.stem
|
|
for prefix in ["prd-", "gtm-", "build-", "idea-"]:
|
|
if name.startswith(prefix):
|
|
name = name[len(prefix):]
|
|
break
|
|
return name
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Redis Query Protocol
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
def redis_query(config: dict, query: dict, timeout: float = QUERY_TIMEOUT) -> Optional[dict]:
|
|
"""Publish a query to the VP via damascus:query and wait for response.
|
|
Returns the parsed JSON response or None on timeout.
|
|
"""
|
|
redis_cfg = config.get("redis", {})
|
|
try:
|
|
r = redis.Redis(
|
|
host=redis_cfg.get("host", "redis"),
|
|
port=redis_cfg.get("port", 6379),
|
|
decode_responses=True,
|
|
)
|
|
r.ping()
|
|
except Exception:
|
|
return None
|
|
|
|
response_data = None
|
|
response_event = threading.Event()
|
|
|
|
# Subscribe first, then publish
|
|
sub = r.pubsub()
|
|
sub.subscribe(QUERY_RESPONSE_CHANNEL)
|
|
|
|
def _listen():
|
|
nonlocal response_data
|
|
for raw in sub.listen():
|
|
if response_event.is_set():
|
|
break
|
|
if raw["type"] != "message":
|
|
continue
|
|
try:
|
|
data = json.loads(raw["data"])
|
|
# Only accept responses that match our query ID
|
|
if data.get("query_id") == query.get("query_id"):
|
|
response_data = data
|
|
response_event.set()
|
|
except (json.JSONDecodeError, KeyError):
|
|
pass
|
|
|
|
listener = threading.Thread(target=_listen, daemon=True)
|
|
listener.start()
|
|
|
|
# Small delay to ensure subscriber is ready
|
|
time.sleep(0.1)
|
|
|
|
try:
|
|
r.publish(QUERY_CHANNEL, json.dumps(query))
|
|
except Exception:
|
|
sub.close()
|
|
return None
|
|
|
|
# Wait for response
|
|
responded = response_event.wait(timeout=timeout)
|
|
sub.close()
|
|
|
|
if responded and response_data:
|
|
return response_data
|
|
return None
|
|
|
|
|
|
def query_vp_list_projects(config: dict) -> Optional[List[dict]]:
|
|
"""Ask VP for all projects."""
|
|
query_id = f"ceo-query-{int(time.time())}"
|
|
result = redis_query(config, {
|
|
"type": "list_projects",
|
|
"query_id": query_id,
|
|
"from": "ceo",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
if result and result.get("status") == "ok":
|
|
return result.get("projects", [])
|
|
return None
|
|
|
|
|
|
def query_vp_get_project(config: dict, project_id: str) -> Optional[dict]:
|
|
"""Ask VP for a specific project."""
|
|
query_id = f"ceo-query-{int(time.time())}"
|
|
result = redis_query(config, {
|
|
"type": "get_project",
|
|
"query_id": query_id,
|
|
"from": "ceo",
|
|
"project_id": project_id,
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
if result and result.get("status") == "ok":
|
|
return result.get("project", None)
|
|
return None
|
|
|
|
|
|
def send_vp_directive(config: dict, directive: dict):
|
|
"""Send a directive to VP via the query channel."""
|
|
try:
|
|
redis_cfg = config.get("redis", {})
|
|
r = redis.Redis(
|
|
host=redis_cfg.get("host", "redis"),
|
|
port=redis_cfg.get("port", 6379),
|
|
decode_responses=True,
|
|
)
|
|
r.ping()
|
|
r.publish(QUERY_CHANNEL, json.dumps(directive))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Filesystem fallback — scan reports/ directory
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
def _scan_reports_for_projects() -> List[dict]:
|
|
"""Fallback: derive project list from files in shared/reports/."""
|
|
projects: Dict[str, dict] = {}
|
|
|
|
if not SHARED_DIR.exists():
|
|
return []
|
|
|
|
for md_file in SHARED_DIR.rglob("*.md"):
|
|
project_name = parse_project_name_from_path(md_file)
|
|
if not project_name or project_name.startswith("failure-") or project_name.startswith("idea-generation-"):
|
|
# Skip failure reports and idea-generation meta files
|
|
if project_name.startswith("failure-"):
|
|
continue
|
|
|
|
if project_name not in projects:
|
|
projects[project_name] = {
|
|
"id": str(len(projects) + 1),
|
|
"name": project_name,
|
|
"phase": "ideation",
|
|
"last_update": md_file.stat().st_mtime,
|
|
"files": [],
|
|
"description": "",
|
|
"ceo_directive": "",
|
|
"decisions": [],
|
|
"deliverables": [],
|
|
"gitea_repo": None,
|
|
"report_paths": [],
|
|
}
|
|
|
|
cat = md_file.parent.name
|
|
projects[project_name]["files"].append(md_file)
|
|
projects[project_name]["report_paths"].append(str(md_file.relative_to(PROJECT_ROOT)))
|
|
|
|
# Infer phase from file category (max 8 chars for dashboard)
|
|
cat_phase_map = {
|
|
"ideas": "ideation",
|
|
"prds": "validate",
|
|
"gtm": "approved",
|
|
"builds": "building",
|
|
"code": "shipped",
|
|
}
|
|
inferred_phase = cat_phase_map.get(cat, "ideation")
|
|
current_phase_rank = list(PHASE_PROGRESS.keys()).index(projects[project_name]["phase"]) if projects[project_name]["phase"] in PHASE_PROGRESS else 0
|
|
new_phase_rank = list(PHASE_PROGRESS.keys()).index(inferred_phase) if inferred_phase in PHASE_PROGRESS else 0
|
|
if new_phase_rank > current_phase_rank:
|
|
projects[project_name]["phase"] = inferred_phase
|
|
|
|
# Update last_update
|
|
mtime = md_file.stat().st_mtime
|
|
if mtime > projects[project_name]["last_update"]:
|
|
projects[project_name]["last_update"] = mtime
|
|
|
|
# Sort by ID
|
|
return sorted(projects.values(), key=lambda p: int(p["id"]))
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Dashboard rendering
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
def render_dashboard(projects: List[dict], title: str = "DAMASCUS FRONTIER \u2014 Projects") -> str:
|
|
"""Render the full dashboard table."""
|
|
lines = []
|
|
width = 70
|
|
padding = max(width - len(title) - 2, 0) // 2
|
|
|
|
lines.append("\u2554" + "\u2550" * width + "\u2557")
|
|
lines.append("\u2551" + " " * padding + title + " " * (width - len(title) - padding) + "\u2551")
|
|
lines.append("\u2560\u2550\u2550\u2550\u2550\u2550\u2550\u2564\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2564\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2564\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2564\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2563")
|
|
lines.append("\u2551 ID \u2502 Name \u2502 Phase \u2502 Progress \u2502 Last Update \u2551")
|
|
lines.append("\u2560\u2550\u2550\u2550\u2550\u2550\u2550\u256a\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u256a\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u256a\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u256a\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2563")
|
|
|
|
if not projects:
|
|
lines.append("\u2551 No projects found yet. \u2551")
|
|
else:
|
|
for proj in projects:
|
|
pid = proj.get("id", "?")
|
|
name = proj.get("name", "unknown")[:16]
|
|
phase = proj.get("phase", "ideation")[:8]
|
|
bar = make_progress_bar(phase)
|
|
last_ts = proj.get("last_update", time.time())
|
|
last_str = human_ago(last_ts)
|
|
|
|
# Pad fields
|
|
pid_str = str(pid).rjust(4)
|
|
name_str = name.ljust(16)
|
|
phase_str = phase.ljust(8)
|
|
bar_str = bar.ljust(7)
|
|
bar_str += " "
|
|
last_str = last_str.ljust(22)
|
|
|
|
lines.append(
|
|
f"\u2551 {pid_str} \u2502 {name_str} \u2502 {phase_str} \u2502 {bar_str} \u2502 {last_str} \u2551"
|
|
)
|
|
|
|
lines.append("\u255a\u2550\u2550\u2550\u2550\u2550\u2550\u2567\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2567\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2567\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2567\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u255d")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Command implementations
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
def cmd_dashboard(config: dict):
|
|
"""Show all projects in a dashboard table."""
|
|
# Try Redis query first
|
|
projects = query_vp_list_projects(config)
|
|
|
|
# Fallback to filesystem scan
|
|
if projects is None:
|
|
projects = _scan_reports_for_projects()
|
|
|
|
print()
|
|
print(render_dashboard(projects))
|
|
print()
|
|
|
|
|
|
def cmd_project(config: dict, project_id: str):
|
|
"""Show full project detail."""
|
|
project = None
|
|
|
|
# Try Redis query first
|
|
redis_project = query_vp_get_project(config, project_id)
|
|
if redis_project:
|
|
project = redis_project
|
|
else:
|
|
# Fallback: scan reports for this project by ID or name
|
|
all_projects = _scan_reports_for_projects()
|
|
for p in all_projects:
|
|
if p["id"] == project_id or p["name"] == project_id:
|
|
project = p
|
|
break
|
|
|
|
if not project:
|
|
print(f"\n\u274c Project '{project_id}' not found.\n")
|
|
return
|
|
|
|
print()
|
|
print("\u2554" + "\u2550" * 60 + "\u2557")
|
|
print(f"\u2551 Project: {project['name'][:46].ljust(46)} \u2551")
|
|
print("\u2560" + "\u2550" * 60 + "\u2563")
|
|
|
|
# Basic info
|
|
phase = project.get("phase", "unknown")
|
|
description = project.get("description", "No description available.")
|
|
print(f"\u2551 Phase: {phase.ljust(48)} \u2551")
|
|
pct = int(PHASE_PROGRESS.get(phase, 0) * 100)
|
|
print(f"\u2551 Progress: {make_progress_bar(phase)} ({pct}%)\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \u2551")
|
|
print(f"\u2551 Last Update: {human_ago(project.get('last_update', time.time())).ljust(44)} \u2551")
|
|
print("\u2560" + "\u2550" * 60 + "\u2563")
|
|
|
|
# Description
|
|
print(f"\u2551 Description: {' ' * 45} \u2551")
|
|
for line in _wrap_text(description, 56):
|
|
print(f"\u2551 {line.ljust(54)} \u2551")
|
|
print("\u2560" + "\u2550" * 60 + "\u2563")
|
|
|
|
# CEO Directive
|
|
ceo_dir = project.get("ceo_directive", "")
|
|
if ceo_dir:
|
|
print(f"\u2551 \U0001f4cb CEO Directive: {' ' * 41} \u2551")
|
|
for line in _wrap_text(ceo_dir, 56):
|
|
print(f"\u2551 {line.ljust(54)} \u2551")
|
|
print("\u2560" + "\u2550" * 60 + "\u2563")
|
|
|
|
# Timeline of decisions
|
|
decisions = project.get("decisions", [])
|
|
if decisions:
|
|
print(f"\u2551 \U0001f550 Decision Timeline: {' ' * 38} \u2551")
|
|
for dec in decisions:
|
|
ts = dec.get("timestamp", "")[:16].replace("T", " ")
|
|
d = dec.get("decision", "")
|
|
print(f"\u2551 [{ts}] {d[:48].ljust(48)} \u2551")
|
|
print("\u2560" + "\u2550" * 60 + "\u2563")
|
|
|
|
# Deliverables
|
|
deliverables = project.get("deliverables", [])
|
|
if deliverables:
|
|
print(f"\u2551 \U0001f4e6 Deliverables: {' ' * 42} \u2551")
|
|
for d in deliverables:
|
|
name = d.get("name", "unknown")
|
|
status = d.get("status", "pending")
|
|
icon = {"complete": "\u2705", "in_progress": "\U0001f535", "pending": "\u23f3"}.get(status, "\u23f3")
|
|
print(f"\u2551 {icon} {name[:50].ljust(50)} \u2551")
|
|
print("\u2560" + "\u2550" * 60 + "\u2563")
|
|
|
|
# Gitea repo
|
|
gitea_url = project.get("gitea_repo")
|
|
if gitea_url:
|
|
print(f"\u2551 \U0001f517 Gitea Repo: {gitea_url[:47].ljust(47)} \u2551")
|
|
print("\u2560" + "\u2550" * 60 + "\u2563")
|
|
|
|
# Report file paths
|
|
report_paths = project.get("report_paths", [])
|
|
if report_paths:
|
|
print(f"\u2551 \U0001f4c4 Report Files: {' ' * 42} \u2551")
|
|
for rp in report_paths[:10]:
|
|
print(f"\u2551 \u2022 {rp[:53].ljust(53)} \u2551")
|
|
print("\u2560" + "\u2550" * 60 + "\u2563")
|
|
|
|
print("\u255a" + "\u2550" * 60 + "\u255d")
|
|
print()
|
|
|
|
|
|
def _wrap_text(text: str, width: int) -> List[str]:
|
|
"""Wrap text to a given width."""
|
|
words = text.split()
|
|
lines = []
|
|
current = ""
|
|
for word in words:
|
|
if len(current) + len(word) + 1 > width:
|
|
if current:
|
|
lines.append(current)
|
|
current = word
|
|
else:
|
|
current = f"{current} {word}".strip()
|
|
if current:
|
|
lines.append(current)
|
|
return lines if lines else [text[:width]]
|
|
|
|
|
|
def cmd_approve(config: dict, project_id: str):
|
|
"""CEO explicitly approves a project for build."""
|
|
print(f"\n\U0001f528 Approving project '{project_id}' for build...")
|
|
|
|
# Send BUILD directive via Redis query channel
|
|
query_id = f"ceo-approve-{int(time.time())}"
|
|
directive = {
|
|
"type": "approve",
|
|
"query_id": query_id,
|
|
"from": "ceo",
|
|
"project_id": project_id,
|
|
"action": "BUILD",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
send_vp_directive(config, directive)
|
|
|
|
# Also send via the CEO channel for direct VP pick-up
|
|
try:
|
|
r = get_redis(config)
|
|
ceo_channel = get_channel(config, "ceo")
|
|
message = (
|
|
f"BUILD: {project_id} — CEO approved. "
|
|
f"Read the PRD and GTM plan, create the Gitea repo, and build the MVP."
|
|
)
|
|
send_directive(r, ceo_channel, message, directive_type="build_approval",
|
|
extra_context={"project_id": project_id, "phase": "approved"})
|
|
print(f" \u2705 BUILD directive sent for '{project_id}'")
|
|
print(f" VP will coordinate PM \u2192 Sales \u2192 Engineer workflow.\n")
|
|
except Exception as e:
|
|
print(f" \u26a0\ufe0f Redis unavailable, but directive queued: {e}\n")
|
|
|
|
|
|
def cmd_queue(config: dict):
|
|
"""Show agent availability and queued tasks."""
|
|
print("\n" + "\u2554" + "\u2550" * 52 + "\u2557")
|
|
print("\u2551 DAMASCUS FRONTIER \u2014 Queue \u2551")
|
|
print("\u2560" + "\u2550" * 52 + "\u2563")
|
|
|
|
# Try Redis query for agent status
|
|
try:
|
|
redis_cfg = config.get("redis", {})
|
|
r = redis.Redis(host=redis_cfg.get("host", "redis"), port=redis_cfg.get("port", 6379), decode_responses=True)
|
|
r.ping()
|
|
# Query VP for queue status
|
|
response = redis_query(config, {"type": "list_queue", "query_id": f"ceo-queue-{int(time.time())}"})
|
|
if response and response.get("status") == "ok":
|
|
agents = response.get("agents", {})
|
|
for agent in ["pm", "sales", "engineer"]:
|
|
status = agents.get(agent, "unknown")
|
|
icon = "\U0001f7e2" if status == "idle" else "\U0001f534" if status == "busy" else "\u26aa"
|
|
print(f"\u2551 {agent.upper():12s} {icon} {status.ljust(32)} \u2551")
|
|
tasks = response.get("queued_tasks", [])
|
|
if tasks:
|
|
print("\u2560" + "\u2550" * 52 + "\u2563")
|
|
print("\u2551 Queued: \u2551")
|
|
for t in tasks[:10]:
|
|
print(f"\u2551 \u2022 {t.get('project_id', '?')} \u2192 {t.get('agent', '?')} (pri={t.get('priority', 0)}) \u2551")
|
|
print("\u255a" + "\u2550" * 52 + "\u255d\n")
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
# Fallback
|
|
print("\u2551 (Redis unavailable — cannot query queue) \u2551")
|
|
print("\u255a" + "\u2550" * 52 + "\u255d\n")
|
|
|
|
|
|
|
|
def cmd_kill(config: dict, project_id: str):
|
|
"""CEO kills a project."""
|
|
print(f"\n\U0001f480 Killing project '{project_id}'...")
|
|
|
|
# Send KILL directive via Redis query channel
|
|
query_id = f"ceo-kill-{int(time.time())}"
|
|
directive = {
|
|
"type": "kill",
|
|
"query_id": query_id,
|
|
"from": "ceo",
|
|
"project_id": project_id,
|
|
"action": "KILL",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
send_vp_directive(config, directive)
|
|
|
|
# Also send via CEO channel
|
|
try:
|
|
r = get_redis(config)
|
|
ceo_channel = get_channel(config, "ceo")
|
|
message = f"KILL: {project_id} — CEO terminated this project. Archive all deliverables."
|
|
send_directive(r, ceo_channel, message, directive_type="kill_order",
|
|
extra_context={"project_id": project_id})
|
|
print(f" \u2705 KILL directive sent for '{project_id}'")
|
|
print(f" VP will archive this project.\n")
|
|
except Exception as e:
|
|
print(f" \u26a0\ufe0f Redis unavailable, but directive queued: {e}\n")
|
|
|
|
|
|
def cmd_watch(config: dict):
|
|
"""Live-updating dashboard that refreshes every 5s."""
|
|
import signal
|
|
|
|
print("\n\U0001f4fa Live dashboard — refreshing every 5s (Ctrl+C to exit)...\n")
|
|
running = True
|
|
|
|
def on_sigint(sig, frame):
|
|
nonlocal running
|
|
running = False
|
|
|
|
original_sigint = signal.signal(signal.SIGINT, on_sigint)
|
|
|
|
# Hide cursor
|
|
sys.stdout.write("\033[?25l")
|
|
sys.stdout.flush()
|
|
|
|
try:
|
|
while running:
|
|
# Get projects
|
|
projects = query_vp_list_projects(config)
|
|
if projects is None:
|
|
projects = _scan_reports_for_projects()
|
|
|
|
dashboard = render_dashboard(projects)
|
|
|
|
# Move cursor to top, clear below
|
|
sys.stdout.write("\033[H\033[J")
|
|
sys.stdout.write(dashboard)
|
|
sys.stdout.write(f"\n\n Last refresh: {datetime.now().strftime('%H:%M:%S')} (Ctrl+C to exit)")
|
|
sys.stdout.flush()
|
|
|
|
# Wait 5s but check running flag every 200ms
|
|
for _ in range(25):
|
|
if not running:
|
|
break
|
|
time.sleep(0.2)
|
|
finally:
|
|
# Show cursor
|
|
sys.stdout.write("\033[?25h")
|
|
sys.stdout.flush()
|
|
signal.signal(signal.SIGINT, original_sigint)
|
|
print("\n\U0001f44b Dashboard closed.\n")
|
|
|
|
|
|
def cmd_status(config: dict):
|
|
"""Show the team's current status."""
|
|
reports_dir = SHARED_DIR
|
|
if not reports_dir.exists():
|
|
print("\n\U0001f4ca No reports yet. The team hasn't produced anything.")
|
|
print(" Try: ./ceo.py \"Generate 3 business ideas\"\n")
|
|
return
|
|
|
|
print("\n\u2554" + "\u2550" * 50 + "\u2557")
|
|
print("\u2551 DAMASCUS FRONTIER — Team Status \u2551")
|
|
print("\u2560" + "\u2550" * 50 + "\u2563")
|
|
|
|
categories = ["ideas", "prds", "gtm", "builds", "research", "failures"]
|
|
for cat in categories:
|
|
cat_dir = reports_dir / cat
|
|
if cat_dir.exists():
|
|
files = sorted(cat_dir.glob("*.md"), key=lambda f: f.stat().st_mtime, reverse=True)
|
|
if files:
|
|
latest = files[0]
|
|
age = time.time() - latest.stat().st_mtime
|
|
if age < 86400:
|
|
age_str = f"{int(age // 3600)}h {int((age % 3600) // 60)}m ago"
|
|
else:
|
|
age_str = f"{int(age // 86400)}d ago"
|
|
print(f"\u2551 {cat.upper():12s} {len(files)} files \u00b7 latest: {latest.name} ({age_str})")
|
|
|
|
print("\u255a" + "\u2550" * 50 + "\u255d\n")
|
|
|
|
|
|
def cmd_reports():
|
|
"""List all generated reports."""
|
|
if not SHARED_DIR.exists():
|
|
print("No reports yet.")
|
|
return
|
|
|
|
print("\n\U0001f4da Reports:\n")
|
|
for cat_dir in sorted(SHARED_DIR.glob("*")):
|
|
if cat_dir.is_dir():
|
|
print(f" [{cat_dir.name}/]")
|
|
for report in sorted(cat_dir.glob("*.md"), key=lambda f: f.stat().st_mtime, reverse=True):
|
|
size = report.stat().st_size
|
|
print(f" {report.name} ({size:,} bytes)")
|
|
print()
|
|
|
|
|
|
def cmd_read(filename: str):
|
|
"""Read a specific report."""
|
|
found = False
|
|
if not SHARED_DIR.exists():
|
|
print(f"Report '{filename}' not found.")
|
|
return
|
|
|
|
for md in SHARED_DIR.rglob("*.md"):
|
|
if md.name == filename or filename in str(md):
|
|
print(f"\n\U0001f4c4 {md.relative_to(SHARED_DIR)}\n")
|
|
print("\u2500" * 60)
|
|
content = md.read_text()
|
|
lines = content.split("\n")
|
|
print("\n".join(lines[:100]))
|
|
if len(lines) > 100:
|
|
print(f"\n... ({len(lines) - 100} more lines)")
|
|
print("\u2500" * 60)
|
|
found = True
|
|
break
|
|
|
|
if not found:
|
|
print(f"Report '{filename}' not found.")
|
|
|
|
|
|
def cmd_listen(config: dict):
|
|
"""Listen to status channel for live updates from the team."""
|
|
redis_cfg = config.get("redis", {})
|
|
channels_cfg = config.get("redis", {}).get("channels", {})
|
|
status_channel = channels_cfg.get("status", "damascus:status")
|
|
|
|
try:
|
|
r = redis.Redis(
|
|
host=redis_cfg.get("host", "redis"),
|
|
port=redis_cfg.get("port", 6379),
|
|
decode_responses=True,
|
|
)
|
|
r.ping()
|
|
except Exception as e:
|
|
print(f"\n\u26a0\ufe0f Cannot connect to Redis: {e}\n")
|
|
return
|
|
|
|
print(f"\n\U0001f442 Listening for team updates on '{status_channel}' (Ctrl+C to stop)...\n")
|
|
|
|
sub = r.pubsub()
|
|
sub.subscribe(status_channel)
|
|
|
|
try:
|
|
for raw in sub.listen():
|
|
if raw["type"] != "message":
|
|
continue
|
|
try:
|
|
data = json.loads(raw["data"])
|
|
agent = data.get("from", "unknown")
|
|
content = data.get("content", "")
|
|
timestamp = data.get("timestamp", "")[:19].replace("T", " ")
|
|
|
|
icon = {"vp": "\U0001f3af", "pm": "\U0001f4cb", "sales": "\U0001f4b0", "engineer": "\U0001f528"}.get(agent, "\U0001f4e1")
|
|
print(f" {icon} [{timestamp}] {agent.upper()}: {content}")
|
|
except Exception:
|
|
pass
|
|
except KeyboardInterrupt:
|
|
print("\n\U0001f44b Stopped listening.")
|
|
|
|
|
|
def cmd_build(config: dict, project_name: str):
|
|
"""Trigger a build for a specific project."""
|
|
try:
|
|
r = get_redis(config)
|
|
ceo_channel = get_channel(config, "ceo")
|
|
message = f"BUILD: {project_name} — Read the PRD and GTM plan, create the Gitea repo, and build the MVP."
|
|
send_directive(r, ceo_channel, message)
|
|
print(f"\n\U0001f4e4 Build directive sent for: {project_name}")
|
|
print(" VP will coordinate the engineering team.\n")
|
|
except Exception as e:
|
|
print(f"\n\u26a0\ufe0f Cannot connect to Redis: {e}")
|
|
print(" Build directive could not be sent.\n")
|
|
|
|
|
|
def cmd_unblock(config: dict, thread_id: str, reassign_to: str = None):
|
|
"""Send an unblock command to the VP for a specific thread."""
|
|
try:
|
|
r = get_redis(config)
|
|
ceo_channel = get_channel(config, "ceo")
|
|
message = f"UNBLOCK:{thread_id}"
|
|
if reassign_to:
|
|
message += f" REASSIGN:{reassign_to}"
|
|
send_directive(r, ceo_channel, message)
|
|
print(f"\U0001f513 Unblock command sent for thread: {thread_id}")
|
|
if reassign_to:
|
|
print(f" Re-assigning to: {reassign_to}")
|
|
except Exception as e:
|
|
print(f"\n\u26a0\ufe0f Cannot connect to Redis: {e}\n")
|
|
|
|
|
|
def cmd_directive(config: dict, message: str):
|
|
"""Send a free-form CEO directive."""
|
|
try:
|
|
r = get_redis(config)
|
|
ceo_channel = get_channel(config, "ceo")
|
|
send_directive(r, ceo_channel, message)
|
|
print(f"\n\U0001f4e4 Directive sent to VP: \"{message}\"")
|
|
print(f" VP will coordinate PM \u2192 Sales \u2192 Engineer workflow.\n")
|
|
except Exception as e:
|
|
print(f"\n\u26a0\ufe0f Cannot connect to Redis: {e}")
|
|
print(" Directive could not be sent.\n")
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Main
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Damascus Frontier — CEO CLI",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
./ceo.py dashboard
|
|
./ceo.py project 1
|
|
./ceo.py approve uptime-watch
|
|
./ceo.py kill docu-scan
|
|
./ceo.py watch
|
|
./ceo.py status
|
|
./ceo.py reports
|
|
./ceo.py read prd-001.md
|
|
./ceo.py listen
|
|
./ceo.py build docu-scan
|
|
./ceo.py "Generate 3 business ideas for SaaS tools"
|
|
""",
|
|
)
|
|
|
|
parser.add_argument("command", nargs="?", default="",
|
|
help="Command to run (dashboard, project, approve, kill, watch, "
|
|
"status, reports, read, listen, build, unblock, or free-form directive)")
|
|
|
|
parser.add_argument("args", nargs="*", default=[],
|
|
help="Arguments for the command")
|
|
|
|
parser.add_argument("--redis-host", default="redis",
|
|
help="Redis host (default: redis)")
|
|
parser.add_argument("--redis-port", type=int, default=6379,
|
|
help="Redis port (default: 6379)")
|
|
parser.add_argument("--reassign", default=None,
|
|
help="Reassign agent when unblocking (e.g., --reassign pm)")
|
|
|
|
parsed = parser.parse_args()
|
|
|
|
config = load_config()
|
|
|
|
# If no command, show help
|
|
if not parsed.command:
|
|
parser.print_help()
|
|
return
|
|
|
|
cmd = parsed.command.lower()
|
|
extra_args = parsed.args
|
|
|
|
if cmd == "queue":
|
|
cmd_queue(config)
|
|
return
|
|
|
|
# ── New dashboard commands ──
|
|
if cmd == "dashboard":
|
|
cmd_dashboard(config)
|
|
return
|
|
|
|
if cmd == "project":
|
|
pid = extra_args[0] if extra_args else ""
|
|
if not pid:
|
|
print("Usage: ./ceo.py project <id>\n")
|
|
return
|
|
cmd_project(config, pid)
|
|
return
|
|
|
|
if cmd == "approve":
|
|
pid = extra_args[0] if extra_args else ""
|
|
if not pid:
|
|
print("Usage: ./ceo.py approve <project_id>\n")
|
|
return
|
|
cmd_approve(config, pid)
|
|
return
|
|
|
|
if cmd == "kill":
|
|
pid = extra_args[0] if extra_args else ""
|
|
if not pid:
|
|
print("Usage: ./ceo.py kill <project_id>\n")
|
|
return
|
|
cmd_kill(config, pid)
|
|
return
|
|
|
|
if cmd == "watch":
|
|
cmd_watch(config)
|
|
return
|
|
|
|
# ── Existing commands ──
|
|
if cmd == "status":
|
|
cmd_status(config)
|
|
return
|
|
|
|
if cmd == "reports":
|
|
cmd_reports()
|
|
return
|
|
|
|
if cmd == "read":
|
|
fname = extra_args[0] if extra_args else ""
|
|
if not fname:
|
|
print("Usage: ./ceo.py read <filename>\n")
|
|
return
|
|
cmd_read(fname)
|
|
return
|
|
|
|
if cmd == "listen":
|
|
cmd_listen(config)
|
|
return
|
|
|
|
if cmd == "build":
|
|
project = extra_args[0] if extra_args else ""
|
|
if not project:
|
|
print("Usage: ./ceo.py build <project_name>\n")
|
|
return
|
|
cmd_build(config, project)
|
|
return
|
|
|
|
if cmd == "unblock":
|
|
tid = extra_args[0] if extra_args else ""
|
|
if not tid:
|
|
print("Usage: ./ceo.py unblock <thread_id> [--reassign agent]\n")
|
|
return
|
|
cmd_unblock(config, tid, parsed.reassign)
|
|
return
|
|
|
|
# ── Fallback: treat as free-form directive ──
|
|
full_message = f"{cmd} {' '.join(extra_args)}".strip()
|
|
cmd_directive(config, full_message)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|