singularity-forge/scripts/parallel-monitor.mjs
2026-05-08 06:57:59 +02:00

959 lines
27 KiB
JavaScript
Executable file

#!/usr/bin/env node
/**
* SF Parallel Worker Monitor
*
* Real-time TUI dashboard for monitoring parallel SF auto-mode workers.
* Zero external dependencies — uses raw ANSI escape codes, Node.js builtins,
* and the shared SF monitor projection store.
*
* Usage:
* node scripts/parallel-monitor.mjs # live dashboard, 5s refresh
* node scripts/parallel-monitor.mjs --interval 3 # faster refresh
* node scripts/parallel-monitor.mjs --once # single snapshot, then exit
* node scripts/parallel-monitor.mjs --heal # auto-respawn dead workers
* node scripts/parallel-monitor.mjs --heal --heal-retries 5 --heal-cooldown 60
*
* Options:
* --interval <sec> Refresh interval in seconds (default: 5)
* --once Render once and exit (useful for scripting/piping)
* --heal Auto-respawn dead workers (opt-in, off by default)
* --heal-retries <n> Max respawn attempts per worker (default: 3)
* --heal-cooldown <sec> Seconds between respawn attempts (default: 30)
* --dir <path> Status file directory (default: .sf/parallel)
* --root <path> Project root (default: cwd)
*
* Data sources:
* .sf/parallel/M0xx.status.json — heartbeat, cost, state (written by orchestrator)
* .sf/worktrees/M0xx/.sf/auto.lock — current unit type + ID (written by worker)
* .sf/worktrees/M0xx/.sf/sf.db — task/slice completion (read-only node:sqlite query)
* .sf/parallel/M0xx.stdout.log — NDJSON events (cost extraction, notify messages)
* .sf/parallel/M0xx.stderr.log — error surfacing
*
* Health indicators:
* ● green — PID alive, fresh heartbeat (<30s)
* ● green — PID alive, heartbeat stale (respawned worker, file mtime used as proxy)
* ○ red — PID dead
*
* Self-healing (--heal):
* When a dead worker is detected, the monitor writes a temp shell script and launches
* a new headless auto-mode process in the worker's worktree with the correct env vars.
* Cooldown prevents rapid respawn loops. Gives up after --heal-retries consecutive
* failures. Resets retry count when a worker comes back alive.
*/
import { execSync, spawn, spawnSync } from "node:child_process";
import fs from "node:fs";
import path from "node:path";
import {
queryParallelRecentCompletionRows,
queryParallelSliceProgress,
} from "../src/resources/extensions/sf/parallel-monitor-store.js";
// ─── Configuration ───────────────────────────────────────────────────────────
const args = process.argv.slice(2);
const INTERVAL_SEC = parseInt(getArg("--interval", "5"), 10);
const PARALLEL_DIR = getArg("--dir", ".sf/parallel");
const PROJECT_ROOT = getArg("--root", process.cwd());
const ONE_SHOT = args.includes("--once");
const HEAL_MODE = args.includes("--heal");
const HEAL_MAX_RETRIES = parseInt(getArg("--heal-retries", "3"), 10);
const HEAL_COOLDOWN_SEC = parseInt(getArg("--heal-cooldown", "30"), 10);
// Per-worker heal state: { lastAttempt: number, retries: number }
const healState = {};
function getArg(flag, defaultVal) {
const idx = args.indexOf(flag);
return idx !== -1 && args[idx + 1] ? args[idx + 1] : defaultVal;
}
// ─── ANSI Helpers ────────────────────────────────────────────────────────────
const ESC = "\x1b[";
const RESET = `${ESC}0m`;
const BOLD = `${ESC}1m`;
const DIM = `${ESC}2m`;
const _ITALIC = `${ESC}3m`;
const FG = {
black: `${ESC}30m`,
red: `${ESC}31m`,
green: `${ESC}32m`,
yellow: `${ESC}33m`,
blue: `${ESC}34m`,
magenta: `${ESC}35m`,
cyan: `${ESC}36m`,
white: `${ESC}37m`,
gray: `${ESC}90m`,
};
const BG = {
black: `${ESC}40m`,
red: `${ESC}41m`,
green: `${ESC}42m`,
yellow: `${ESC}43m`,
blue: `${ESC}44m`,
white: `${ESC}47m`,
};
// Screen control
const CLEAR_SCREEN = `${ESC}2J${ESC}H`;
const HIDE_CURSOR = `${ESC}?25l`;
const SHOW_CURSOR = `${ESC}?25h`;
const _SAVE_POS = `${ESC}s`;
const _RESTORE_POS = `${ESC}u`;
function _moveTo(row, col) {
return `${ESC}${row};${col}H`;
}
// ─── Data Reading ────────────────────────────────────────────────────────────
function readJsonSafe(filePath) {
try {
return JSON.parse(fs.readFileSync(filePath, "utf-8"));
} catch {
return null;
}
}
function isPidAlive(pid) {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
function discoverWorkers() {
const dir = path.resolve(PROJECT_ROOT, PARALLEL_DIR);
const worktreeDir = path.resolve(PROJECT_ROOT, ".sf/worktrees");
const mids = new Set();
// From status files
if (fs.existsSync(dir)) {
for (const f of fs.readdirSync(dir)) {
if (f.endsWith(".status.json")) mids.add(f.replace(".status.json", ""));
}
}
// From stderr/stdout logs (manually respawned workers may lack status.json)
if (fs.existsSync(dir)) {
for (const f of fs.readdirSync(dir)) {
const m = f.match(/^(M\d+)\.(stderr|stdout)\.log$/);
if (m) mids.add(m[1]);
}
}
// From worktree directories that have auto.lock (actively running)
if (fs.existsSync(worktreeDir)) {
for (const d of fs.readdirSync(worktreeDir)) {
if (
d.startsWith("M") &&
fs.existsSync(path.join(worktreeDir, d, ".sf", "auto.lock"))
) {
mids.add(d);
}
}
}
return [...mids].sort();
}
function readWorkerStatus(mid) {
const statusPath = path.resolve(
PROJECT_ROOT,
PARALLEL_DIR,
`${mid}.status.json`,
);
return readJsonSafe(statusPath);
}
function readAutoLock(mid) {
const lockPath = path.resolve(
PROJECT_ROOT,
`.sf/worktrees/${mid}/.sf/auto.lock`,
);
return readJsonSafe(lockPath);
}
function readRecentEvents(mid, maxLines = 5) {
const stdoutPath = path.resolve(
PROJECT_ROOT,
PARALLEL_DIR,
`${mid}.stdout.log`,
);
const notifications = [];
const errors = [];
// Parse NDJSON notify events from stdout log
if (fs.existsSync(stdoutPath)) {
try {
const stat = fs.statSync(stdoutPath);
const readSize = Math.min(stat.size, 32768);
const fd = fs.openSync(stdoutPath, "r");
const buf = Buffer.alloc(readSize);
fs.readSync(fd, buf, 0, readSize, Math.max(0, stat.size - readSize));
fs.closeSync(fd);
const content = buf.toString("utf-8");
const lines = content.trim().split("\n").slice(-100);
for (const line of lines) {
try {
const obj = JSON.parse(line);
if (obj.method === "notify" && obj.message) {
notifications.push({ ts: Date.now(), msg: obj.message, mid });
}
} catch {
/* skip */
}
}
} catch {
/* skip */
}
}
// Parse errors from stderr log — only new bytes since monitor started
const stderrPath = path.resolve(
PROJECT_ROOT,
PARALLEL_DIR,
`${mid}.stderr.log`,
);
if (fs.existsSync(stderrPath)) {
try {
const stat = fs.statSync(stderrPath);
// Record baseline on first read — skip pre-existing errors
if (!(mid in stderrBaselines)) {
stderrBaselines[mid] = stat.size;
}
const baseline = stderrBaselines[mid];
const newBytes = stat.size - baseline;
if (newBytes > 0) {
const readSize = Math.min(newBytes, 4096);
const fd = fs.openSync(stderrPath, "r");
const buf = Buffer.alloc(readSize);
fs.readSync(
fd,
buf,
0,
readSize,
Math.max(baseline, stat.size - readSize),
);
fs.closeSync(fd);
const content = buf.toString("utf-8");
const lines = content.trim().split("\n").slice(-10);
for (const line of lines) {
if (
line.includes("error") ||
line.includes("Error") ||
line.includes("WARN") ||
line.includes("exited")
) {
errors.push({
ts: Date.now(),
msg: line.trim(),
mid,
isError: true,
});
}
}
}
} catch {
/* skip */
}
}
return {
notifications: notifications.slice(-maxLines),
errors: errors.slice(-3),
};
}
/**
* Extract accumulated cost from NDJSON stdout log (fallback when status.json is missing).
* Sums `message.usage.cost.total` from all `message_end` events.
*/
function extractCostFromNdjson(mid) {
const stdoutPath = path.resolve(
PROJECT_ROOT,
PARALLEL_DIR,
`${mid}.stdout.log`,
);
if (!fs.existsSync(stdoutPath)) return 0;
try {
const content = fs.readFileSync(stdoutPath, "utf-8");
let total = 0;
for (const line of content.split("\n")) {
if (!line.includes("message_end")) continue;
try {
const obj = JSON.parse(line);
if (obj.type === "message_end") {
const cost = obj.message?.usage?.cost?.total;
if (typeof cost === "number") total += cost;
}
} catch {
/* skip */
}
}
return total;
} catch {
return 0;
}
}
// ─── Self-Healing ────────────────────────────────────────────────────────────
// Auto-detect the SF loader path — works across npm global, homebrew, and local installs
function findSfLoader() {
// 1. Check if we're running from inside the singularity-forge repo itself
const repoLoader = path.resolve(
import.meta.dirname,
"..",
"dist",
"loader.js",
);
if (fs.existsSync(repoLoader)) return repoLoader;
// 2. Check common global install locations
try {
const globalRoot = execSync("npm root -g", {
encoding: "utf-8",
timeout: 3000,
}).trim();
const candidates = [
path.join(globalRoot, "singularity-forge", "dist", "loader.js"),
path.join(globalRoot, "@sf", "pi", "dist", "loader.js"),
];
for (const c of candidates) {
if (fs.existsSync(c)) return c;
}
} catch {
/* skip */
}
// 3. Try `which sf` and resolve symlink
try {
const pathLookup = process.platform === "win32" ? "where.exe" : "which";
const lookupArgs = ["sf"];
const result = spawnSync(pathLookup, lookupArgs, {
encoding: "utf-8",
timeout: 3000,
});
const bin =
result.status === 0 ? result.stdout.trim().split(/\r?\n/)[0]?.trim() : "";
if (bin) {
const realBin = fs.realpathSync(bin);
const loader = path.resolve(
path.dirname(realBin),
"..",
"dist",
"loader.js",
);
if (fs.existsSync(loader)) return loader;
}
} catch {
/* skip */
}
return null;
}
const SF_LOADER = findSfLoader();
/**
* Respawn a dead worker. Returns the new PID or null on failure.
* Uses a detached Node child with log file descriptors so the child is fully detached.
*/
function respawnWorker(mid) {
const worktreeDir = path.resolve(PROJECT_ROOT, `.sf/worktrees/${mid}`);
if (!fs.existsSync(worktreeDir)) return null;
if (!fs.existsSync(SF_LOADER)) return null;
const stdoutLog = path.resolve(
PROJECT_ROOT,
PARALLEL_DIR,
`${mid}.stdout.log`,
);
const stderrLog = path.resolve(
PROJECT_ROOT,
PARALLEL_DIR,
`${mid}.stderr.log`,
);
let stdoutFd;
let stderrFd;
try {
fs.mkdirSync(path.dirname(stdoutLog), { recursive: true });
stdoutFd = fs.openSync(stdoutLog, "a");
stderrFd = fs.openSync(stderrLog, "a");
const child = spawn(
process.execPath,
[SF_LOADER, "headless", "--json", "auto"],
{
cwd: worktreeDir,
detached: true,
env: {
...process.env,
SF_MILESTONE_LOCK: mid,
SF_PROJECT_ROOT: PROJECT_ROOT,
SF_PARALLEL_WORKER: "1",
},
stdio: ["ignore", stdoutFd, stderrFd],
windowsHide: true,
},
);
child.unref();
return child.pid ?? null;
} catch (_err) {
return null;
} finally {
if (stdoutFd !== undefined) {
try {
fs.closeSync(stdoutFd);
} catch {}
}
if (stderrFd !== undefined) {
try {
fs.closeSync(stderrFd);
} catch {}
}
}
}
/**
* Check all workers and respawn dead ones if --heal is active.
* Returns an array of heal events for the event feed.
*/
function healWorkers(workers) {
if (!HEAL_MODE) return [];
const events = [];
const now = Date.now();
for (const wk of workers) {
if (wk.alive) {
// Worker is alive — reset its heal state on success
if (healState[wk.mid]) {
healState[wk.mid].retries = 0;
}
continue;
}
// Worker is dead — check if we should attempt a respawn
if (!healState[wk.mid]) {
healState[wk.mid] = { lastAttempt: 0, retries: 0 };
}
const hs = healState[wk.mid];
// Give up after max retries
if (hs.retries >= HEAL_MAX_RETRIES) {
if (hs.retries === HEAL_MAX_RETRIES) {
events.push({
ts: now,
mid: wk.mid,
msg: `${wk.mid}: gave up after ${HEAL_MAX_RETRIES} respawn attempts`,
});
hs.retries++; // Increment past max so this message only shows once
}
continue;
}
// Cooldown — don't respawn too quickly
const elapsed = now - hs.lastAttempt;
if (elapsed < HEAL_COOLDOWN_SEC * 1000) {
const _remaining = Math.ceil((HEAL_COOLDOWN_SEC * 1000 - elapsed) / 1000);
// Don't spam the feed — only note on first cooldown tick
continue;
}
// Check the milestone isn't already complete
const allSlicesDone =
wk.slices.length > 0 && wk.slices.every((s) => s.status === "complete");
if (allSlicesDone) {
events.push({
ts: now,
mid: wk.mid,
msg: `${wk.mid}: all slices complete, no respawn needed`,
});
hs.retries = HEAL_MAX_RETRIES + 1; // Don't try again
continue;
}
// Attempt respawn
hs.lastAttempt = now;
hs.retries++;
events.push({
ts: now,
mid: wk.mid,
msg: `🔄 ${wk.mid}: respawning (attempt ${hs.retries}/${HEAL_MAX_RETRIES})...`,
});
const newPid = respawnWorker(wk.mid);
if (newPid) {
events.push({
ts: now,
mid: wk.mid,
msg: `🟢 ${wk.mid}: respawned as PID ${newPid}`,
});
// Reset stderr baseline so we don't show old errors
delete stderrBaselines[wk.mid];
} else {
events.push({
ts: now,
mid: wk.mid,
isError: true,
msg: `${wk.mid}: respawn failed`,
});
}
}
return events;
}
// ─── Formatting Helpers ──────────────────────────────────────────────────────
function formatDuration(ms) {
if (!ms || ms < 0) return "--:--";
const totalSec = Math.floor(ms / 1000);
const h = Math.floor(totalSec / 3600);
const m = Math.floor((totalSec % 3600) / 60);
const s = totalSec % 60;
if (h > 0) return `${h}h${String(m).padStart(2, "0")}m`;
return `${String(m).padStart(2, "0")}m${String(s).padStart(2, "0")}s`;
}
function formatCost(cost) {
if (cost == null) return "$-.--";
return `$${cost.toFixed(2)}`;
}
function healthColor(heartbeatAge, alive) {
if (!alive) return "red";
// PID alive is the strongest signal — worker is running
if (heartbeatAge < 30000) return "green";
// Alive but stale heartbeat — either respawned (no orchestrator writing status.json)
// or potentially stuck. Show green since headless idle timeout (120s) kills stuck workers.
if (alive) return "green";
return "red";
}
function healthIcon(color) {
switch (color) {
case "green":
return "●";
case "yellow":
return "◐";
case "red":
return "○";
default:
return "?";
}
}
function unitTypeLabel(unitType) {
const labels = {
"execute-task": "EXEC",
"research-slice": "RSRCH",
"plan-slice": "PLAN",
"complete-slice": "DONE",
"complete-task": "DONE",
reassess: "ASSESS",
validate: "VALID",
};
return labels[unitType] || (unitType || "---").toUpperCase().slice(0, 5);
}
function progressBar(done, total, width = 20) {
if (total === 0) return `${"░".repeat(width)}`;
const filled = Math.round((done / total) * width);
const empty = width - filled;
return `${"█".repeat(filled)}${"░".repeat(empty)}`;
}
function _pad(str, width) {
const s = String(str);
return s.length >= width
? s.slice(0, width)
: s + " ".repeat(width - s.length);
}
function _rpad(str, width) {
const s = String(str);
return s.length >= width
? s.slice(0, width)
: " ".repeat(width - s.length) + s;
}
function truncate(str, maxLen) {
if (str.length <= maxLen) return str;
return str.slice(0, maxLen - 1) + "…";
}
/**
* Get recently completed tasks/slices from the worktree DB for the event feed.
*/
function queryRecentCompletions(mid) {
return queryParallelRecentCompletionRows(PROJECT_ROOT, mid).map((row) => ({
ts: row.completedAt ? new Date(row.completedAt).getTime() : Date.now(),
msg: `${mid}/${row.sliceId}/${row.taskId}${row.oneLiner ? ": " + row.oneLiner : ""}`,
mid,
}));
}
// ─── Rendering ───────────────────────────────────────────────────────────────
const COLS = Math.max(process.stdout.columns || 100, 80);
const _ROWS = Math.max(process.stdout.rows || 40, 20);
let lastEventFeed = []; // Persisted across renders
const stderrBaselines = {}; // mid → file size at monitor startup (skip pre-existing errors)
function collectWorkerData() {
const mids = discoverWorkers();
const workers = [];
for (const mid of mids) {
const status = readWorkerStatus(mid);
const lock = readAutoLock(mid);
const slices = queryParallelSliceProgress(PROJECT_ROOT, mid);
const { notifications, errors } = readRecentEvents(mid, 3);
// Prefer auto.lock PID (written by the running worker) over status.json PID
// (written by the orchestrator, stale after respawn)
const pid = lock?.pid || status?.pid;
const alive = pid ? isPidAlive(pid) : false;
// Heartbeat: prefer status.json if its PID matches (orchestrator-managed),
// otherwise fall back to stdout.log mtime (respawned workers write NDJSON continuously)
let heartbeatAge = Infinity;
const statusPidMatches = status?.pid && status.pid === pid;
if (status?.lastHeartbeat && statusPidMatches) {
heartbeatAge = Date.now() - status.lastHeartbeat;
} else {
// Check stdout/stderr log mtime as proxy heartbeat
const stdoutLog = path.resolve(
PROJECT_ROOT,
PARALLEL_DIR,
`${mid}.stdout.log`,
);
const stderrLog = path.resolve(
PROJECT_ROOT,
PARALLEL_DIR,
`${mid}.stderr.log`,
);
try {
const mtimes = [];
if (fs.existsSync(stdoutLog))
mtimes.push(fs.statSync(stdoutLog).mtimeMs);
if (fs.existsSync(stderrLog))
mtimes.push(fs.statSync(stderrLog).mtimeMs);
if (lock?.unitStartedAt)
mtimes.push(new Date(lock.unitStartedAt).getTime());
if (mtimes.length > 0) heartbeatAge = Date.now() - Math.max(...mtimes);
} catch {
/* skip */
}
}
// Cost: prefer status.json, fall back to NDJSON log parsing
let cost = status?.cost || 0;
if (cost === 0) {
cost = extractCostFromNdjson(mid);
}
const totalTasks = slices.reduce((sum, s) => sum + s.total, 0);
const doneTasks = slices.reduce((sum, s) => sum + s.done, 0);
const doneSlices = slices.filter((s) => s.status === "complete").length;
const totalSlices = slices.length;
// Current unit from auto.lock (more accurate than status.json currentUnit)
const currentUnit = lock?.unitId || status?.currentUnit || null;
const unitType = lock?.unitType || null;
const unitStarted = lock?.unitStartedAt
? new Date(lock.unitStartedAt).getTime()
: null;
// If no lock and worker is dead, show nothing (not a misleading "START" label)
const _showUnit = currentUnit || (alive ? null : null);
const elapsed = status?.startedAt
? Date.now() - status.startedAt
: lock?.startedAt
? Date.now() - new Date(lock.startedAt).getTime()
: 0;
workers.push({
mid,
pid,
alive,
state: alive ? "running" : status?.state || "dead",
cost,
heartbeatAge,
health: healthColor(heartbeatAge, alive),
currentUnit,
unitType,
unitElapsed: unitStarted ? Date.now() - unitStarted : 0,
elapsed,
totalTasks,
doneTasks,
totalSlices,
doneSlices,
slices,
notifications,
errors,
});
}
return workers;
}
function render(workers) {
const buf = [];
const w = COLS;
// ── Header ──
buf.push("");
const title = " SF Parallel Monitor ";
const titlePad = Math.max(0, Math.floor((w - title.length) / 2));
buf.push(
`${" ".repeat(titlePad)}${BOLD}${BG.blue}${FG.white}${title}${RESET}`,
);
const now = new Date().toLocaleTimeString();
const totalCost = workers.reduce((s, w) => s + w.cost, 0);
const aliveCount = workers.filter((w) => w.alive).length;
const healTag = HEAL_MODE ? `${FG.green}⚕ heal${RESET}${DIM}` : "";
buf.push(
`${DIM} ${now}${aliveCount}/${workers.length} alive │ Total: ${RESET}${BOLD}${formatCost(totalCost)}${RESET}${DIM} │ Refresh: ${INTERVAL_SEC}s${healTag}${RESET}`,
);
buf.push(`${DIM}${"─".repeat(w)}${RESET}`);
// ── Worker Panels ──
if (workers.length === 0) {
buf.push("");
buf.push(` ${FG.yellow}No workers found in ${PARALLEL_DIR}/${RESET}`);
buf.push(` ${DIM}Waiting for .sf/parallel/*.status.json files...${RESET}`);
} else {
for (const wk of workers) {
buf.push("");
// Worker header: milestone ID + health + state
const icon = healthIcon(wk.health);
const hc = FG[wk.health];
const stateLabel = wk.alive
? wk.state === "running"
? `${FG.green}RUNNING${RESET}`
: `${FG.yellow}${wk.state.toUpperCase()}${RESET}`
: `${FG.red}${BOLD}DEAD${RESET}`;
const heartbeatText =
wk.heartbeatAge === Infinity
? "never"
: formatDuration(wk.heartbeatAge) + " ago";
buf.push(
` ${hc}${icon}${RESET} ${BOLD}${wk.mid}${RESET} ${stateLabel} ${DIM}PID ${wk.pid || "?"}${RESET} ${DIM}${RESET} ${DIM}elapsed${RESET} ${formatDuration(wk.elapsed)} ${DIM}${RESET} ${DIM}cost${RESET} ${BOLD}${formatCost(wk.cost)}${RESET} ${DIM}${RESET} ${DIM}heartbeat${RESET} ${hc}${heartbeatText}${RESET}`,
);
// Current unit
if (wk.currentUnit) {
const phaseColor =
wk.unitType === "execute-task"
? FG.cyan
: wk.unitType === "research-slice"
? FG.magenta
: wk.unitType === "plan-slice"
? FG.blue
: wk.unitType?.includes("complete")
? FG.green
: FG.white;
buf.push(
` ${DIM}${RESET} ${phaseColor}${unitTypeLabel(wk.unitType)}${RESET} ${wk.currentUnit} ${DIM}(${formatDuration(wk.unitElapsed)})${RESET}`,
);
} else if (!wk.alive) {
buf.push(` ${DIM}${FG.red}stopped${RESET}`);
} else {
buf.push(` ${DIM}▸ idle / between units${RESET}`);
}
// Slice progress grid
if (wk.slices.length > 0) {
const sliceChips = wk.slices.map((s) => {
const pct = s.total > 0 ? s.done / s.total : 0;
let color;
if (s.status === "complete") color = FG.green;
else if (pct > 0) color = FG.yellow;
else color = FG.gray;
const label = `${s.id}:${s.done}/${s.total}`;
return `${color}${label}${RESET}`;
});
buf.push(` ${DIM}slices${RESET} ${sliceChips.join(" ")}`);
// Overall progress bar
const bar = progressBar(wk.doneTasks, wk.totalTasks, 30);
const pctStr =
wk.totalTasks > 0
? `${Math.round((wk.doneTasks / wk.totalTasks) * 100)}%`
: "0%";
buf.push(
` ${DIM}tasks${RESET} ${FG.green}${bar}${RESET} ${wk.doneTasks}/${wk.totalTasks} ${DIM}(${pctStr})${RESET} ${DIM}${RESET} ${DIM}slices done${RESET} ${wk.doneSlices}/${wk.totalSlices}`,
);
}
// Recent errors from this worker
if (wk.errors.length > 0) {
for (const err of wk.errors.slice(-2)) {
buf.push(` ${FG.red}${truncate(err.msg, w - 10)}${RESET}`);
}
}
}
}
// ── Separator ──
buf.push("");
buf.push(`${DIM}${"─".repeat(w)}${RESET}`);
// ── Event Feed ──
buf.push(` ${BOLD}Recent Events${RESET}`);
// Collect new notification events from all workers
for (const wk of workers) {
for (const evt of wk.notifications) {
if (!lastEventFeed.some((e) => e.msg === evt.msg && e.mid === evt.mid)) {
lastEventFeed.push(evt);
}
}
}
// Also add recent task completions from the DB
for (const wk of workers) {
const completions = queryRecentCompletions(wk.mid);
for (const evt of completions) {
if (!lastEventFeed.some((e) => e.msg === evt.msg)) {
lastEventFeed.push(evt);
}
}
}
// Sort by timestamp and keep last 10
lastEventFeed.sort((a, b) => a.ts - b.ts);
lastEventFeed = lastEventFeed.slice(-10);
if (lastEventFeed.length === 0) {
buf.push(` ${DIM}No events yet...${RESET}`);
} else {
for (const evt of lastEventFeed.slice(-6)) {
const midTag = `${FG.cyan}${evt.mid}${RESET}`;
buf.push(` ${DIM}${RESET} ${midTag} ${truncate(evt.msg, w - 12)}`);
}
}
// ── Completion Check ──
const allDone = workers.length > 0 && workers.every((w) => !w.alive);
if (allDone) {
buf.push("");
buf.push(`${DIM}${"─".repeat(w)}${RESET}`);
buf.push("");
const doneMsg = " ALL WORKERS COMPLETE ";
const donePad = Math.max(0, Math.floor((w - doneMsg.length) / 2));
buf.push(
`${" ".repeat(donePad)}${BOLD}${BG.green}${FG.black}${doneMsg}${RESET}`,
);
buf.push("");
for (const wk of workers) {
buf.push(
` ${wk.mid} ${formatCost(wk.cost)} ${DIM}${RESET} ${wk.doneSlices}/${wk.totalSlices} slices ${wk.doneTasks}/${wk.totalTasks} tasks ${DIM}${RESET} ${formatDuration(wk.elapsed)}`,
);
}
const totalCostFinal = workers.reduce((s, w) => s + w.cost, 0);
buf.push(` ${BOLD}Total: ${formatCost(totalCostFinal)}${RESET}`);
}
// ── Footer ──
buf.push("");
const healInfo = HEAL_MODE
? ` │ heal: ${HEAL_COOLDOWN_SEC}s cooldown, ${HEAL_MAX_RETRIES} max retries`
: "";
buf.push(
` ${DIM}Ctrl+C to exit${allDone ? " (monitoring stopped)" : ""}${healInfo}${RESET}`,
);
// Write to screen
process.stdout.write(CLEAR_SCREEN);
process.stdout.write(buf.join("\n") + "\n");
return allDone;
}
// ─── Main Loop ───────────────────────────────────────────────────────────────
function main() {
process.stdout.write(HIDE_CURSOR);
// Handle resize
process.stdout.on("resize", () => {
// COLS/ROWS are recalculated on next render
});
// Graceful exit
const cleanup = () => {
process.stdout.write(SHOW_CURSOR);
process.stdout.write(CLEAR_SCREEN);
console.log("Monitor stopped.");
process.exit(0);
};
process.on("SIGINT", cleanup);
process.on("SIGTERM", cleanup);
// Initial render
const workers = collectWorkerData();
const healEvents = healWorkers(workers);
for (const evt of healEvents) lastEventFeed.push(evt);
let done = render(workers);
if (done || ONE_SHOT) {
process.stdout.write(SHOW_CURSOR);
return;
}
// Refresh loop
const timer = setInterval(() => {
try {
const workers = collectWorkerData();
const healEvents = healWorkers(workers);
for (const evt of healEvents) lastEventFeed.push(evt);
done = render(workers);
if (done) {
clearInterval(timer);
// Keep showing final state for 3 seconds then exit
setTimeout(() => {
process.stdout.write(SHOW_CURSOR);
process.exit(0);
}, 3000);
}
} catch (err) {
// Don't crash the monitor on transient read errors
process.stderr.write(`Monitor error: ${err.message}\n`);
}
}, INTERVAL_SEC * 1000);
}
main();