fix: move unit runtime under uok ownership

This commit is contained in:
Mikael Hugo 2026-05-06 07:02:28 +02:00
parent 42c651d106
commit 500a9d1c1d
19 changed files with 825 additions and 679 deletions

View file

@ -2,7 +2,7 @@
## Purpose
Singularity Forge (SF) is an autonomous agent orchestration system. It runs long-horizon coding work as a state machine: milestones → slices → tasks. Each dispatch unit runs a fresh AI context, writes its output to disk, then terminates. A deterministic controller (not an LLM) reads disk state and decides what to dispatch next. The user is the end-gate — autonomous mode delivers work to human review, it does not merge to production unattended.
Singularity Forge (SF) is an autonomous agent orchestration system. It runs long-horizon coding work through the Unified Operation Kernel (UOK): milestones → slices → tasks. Each dispatch unit runs a fresh AI context, writes its output to disk, then terminates. UOK owns lifecycle, recovery, and the DB-backed run ledger; runtime files under `.sf/runtime/` are projections for query, UI, and compatibility. A deterministic controller (not an LLM) reads canonical state and decides what to dispatch next. The user is the end-gate — autonomous mode delivers work to human review, it does not merge to production unattended.
## Codemap
@ -13,7 +13,7 @@ Singularity Forge (SF) is an autonomous agent orchestration system. It runs long
| `src/headless-events.ts` | Transcript event parsing and notification routing |
| `src/extension-registry.ts` | Registers SF as a Pi coding-agent extension |
| `src/resources/extensions/sf/` | All SF extension source (TypeScript) |
| `src/resources/extensions/sf/auto/` | Autonomous workflow orchestrator (state machine, dispatch, planning) |
| `src/resources/extensions/sf/auto/` | Autonomous workflow orchestrator (UOK lifecycle, dispatch, planning) |
| `src/resources/extensions/sf/bootstrap/` | Context injection, system prompt assembly |
| `src/resources/extensions/sf/prompts/` | Prompt templates (`.md`, loaded by `prompt-loader.ts`) |
| `src/resources/extensions/sf/tests/` | Unit and integration tests |
@ -60,11 +60,11 @@ The symlink case uses a blanket `.sf` gitignore pattern (git cannot traverse sym
## Key flows
**Autonomous dispatch loop** (`src/resources/extensions/sf/auto/`):
1. `deriveState()` reads disk and produces a typed state snapshot
2. Controller selects the next dispatch unit (research, plan, implement, verify, etc.)
1. UOK reconciles the DB-backed ledger, projections, and runtime diagnostics into a typed state snapshot
2. Controller selects the next dispatch unit (research, plan, implement, verify, etc.) from canonical state
3. A fresh agent context is started with the task plan injected via `system-context.ts`
4. Agent writes artifacts to disk, commits, exits
5. Loop repeats until milestone completes or a gate fails
5. UOK records completion/recovery, updates projections, and repeats until milestone completes or a gate fails
**System context assembly** (`bootstrap/system-context.ts`):
`PREFERENCES.md``KNOWLEDGE.md``ARCHITECTURE.md``CODEBASE.md` → code intelligence → memories → worktree/VCS blocks
@ -74,7 +74,7 @@ All file writes in autonomous mode pass through a gate. Protected files (CLAUDE.
## Invariants
- The state machine (controller) is pure TypeScript — no LLM decisions in the dispatch loop itself.
- UOK and the dispatch controller are pure TypeScript — no LLM decisions in the dispatch loop itself.
- Each dispatch unit runs in a fresh context — no cross-turn state accumulation.
- Planning artifacts are tracked in git; runtime artifacts are never committed.
- `SF_RUNTIME_PATTERNS` in `gitignore.ts` is the canonical source of truth for runtime paths. `git-service.ts` (`RUNTIME_EXCLUSION_PATHS`) and `worktree-manager.ts` (`SKIP_*` arrays) must stay synchronized with it.

View file

@ -231,7 +231,7 @@ This is what makes SF different. Run it, walk away, come back to built software.
/sf autonomous
```
Autonomous mode is a state machine driven by files on disk. It reads `.sf/STATE.md`, determines the next unit of work, creates a fresh agent session, injects a focused prompt with all relevant context pre-inlined, and lets the LLM execute. When the LLM finishes, autonomous mode reads disk state again and dispatches the next unit. Legacy `/sf auto` remains accepted only for compatibility; new prompts and docs use `/sf autonomous`.
Autonomous mode is governed by the Unified Operation Kernel (UOK), not by the LLM or a loose file loop. UOK reads canonical project state, records each run in the DB-backed ledger, projects runtime files for query/UI/backcompat, determines the next unit of work, creates a fresh agent session, injects a focused prompt with all relevant context pre-inlined, and lets the LLM execute. When the LLM finishes, autonomous mode reconciles the UOK ledger and projections before dispatching the next unit. Legacy `/sf auto` remains accepted only for compatibility; new prompts and docs use `/sf autonomous`.
**What happens under the hood:**
@ -261,7 +261,7 @@ Autonomous mode is a state machine driven by files on disk. It reads `.sf/STATE.
### `/sf` and `/sf next` — Step Mode
By default, `/sf` runs in **step mode**: the same state machine as autonomous mode, but it pauses between units with a wizard showing what completed and what's next. You advance one step at a time, review the output, and continue when ready.
By default, `/sf` runs in **step mode**: the same UOK-governed dispatch loop as autonomous mode, but it pauses between units with a wizard showing what completed and what's next. You advance one step at a time, review the output, and continue when ready.
- **No `.sf/` directory** → Start a new project. Discussion flow captures your vision, constraints, and preferences.
- **Milestone exists, no roadmap** → Discuss or research the milestone.
@ -309,7 +309,7 @@ sf
SF opens an interactive agent session. From there, you have two ways to work:
**`/sf` — step mode.** Type `/sf` and SF executes one unit of work at a time, pausing between each with a wizard showing what completed and what's next. Same state machine as autonomous mode, but you stay in the loop. No project yet? It starts the discussion flow. Roadmap exists? It plans or executes the next step.
**`/sf` — step mode.** Type `/sf` and SF executes one unit of work at a time, pausing between each with a wizard showing what completed and what's next. Same UOK lifecycle and recovery model as autonomous mode, but you stay in the loop. No project yet? It starts the discussion flow. Roadmap exists? It plans or executes the next step.
**`/sf autonomous` — autonomous mode.** Type `/sf autonomous` and walk away. SF researches, plans, executes, verifies, commits, and advances through every slice until the milestone is complete. Fresh context window per task. No babysitting.

View file

@ -56,7 +56,7 @@ Every dispatch creates a new agent session. The LLM starts with a clean context
| Extension | What It Provides |
|-----------|-----------------|
| **SF** | Core workflow engine — auto mode, state machine, commands, dashboard |
| **SF** | Core workflow engine — UOK-governed auto mode, commands, dashboard |
| **Browser Tools** | Playwright-based browser automation — navigation, forms, screenshots, PDF export, device emulation, visual regression, structured data extraction, route mocking, accessibility tree inspection, and semantic actions |
| **Search the Web** | Brave Search, Tavily, or Jina page extraction |
| **Google Search** | Gemini-powered web search with AI-synthesized answers |
@ -110,7 +110,7 @@ Performance-critical operations use a Rust N-API engine:
The auto mode dispatch pipeline:
```
1. Read disk state (STATE.md, roadmap, plans)
1. Reconcile canonical project state, the UOK run ledger, and runtime projections
2. Determine next unit type and ID
3. Classify complexity → select model tier
4. Apply budget pressure adjustments
@ -121,7 +121,7 @@ The auto mode dispatch pipeline:
9. Build dispatch prompt (applying inline level compression)
10. Create fresh agent session
11. Inject prompt and let LLM execute
12. On completion: snapshot metrics, verify artifacts, persist state
12. On completion: snapshot metrics, verify artifacts, persist UOK ledger state, update projections
13. Loop to step 1
```
@ -131,7 +131,7 @@ Phase skipping (from token profile) gates steps 2-3: if a phase is skipped, the
| Module | Purpose |
|--------|---------|
| `auto.ts` | Auto-mode state machine and orchestration |
| `auto.ts` | Auto-mode orchestration over the UOK lifecycle and dispatch loop |
| `auto/session.ts` | `AutoSession` class — all mutable auto-mode state in one encapsulated instance |
| `auto-dispatch.ts` | Declarative dispatch table (phase → unit mapping) |
| `auto-idempotency.ts` | Completed-key checks, skip loop detection, key eviction |

View file

@ -4,7 +4,7 @@ Autonomous mode is SF's product-development execution engine for the purpose-to-
## How It Works
Autonomous mode is a **state machine driven by structured state on disk**. It reads `.sf` state, determines the next unit of work, creates a fresh agent session, injects a focused prompt with all relevant context pre-inlined, and lets the LLM execute. When the LLM finishes, autonomous mode reads disk state again and dispatches the next unit. Markdown files are projections for humans when structured state exists.
Autonomous mode is governed by the **Unified Operation Kernel (UOK)**. UOK reads canonical project state, records lifecycle and recovery in the DB-backed ledger, and writes runtime files as projections for query, UI, and compatibility. It determines the next unit of work, creates a fresh agent session, injects a focused prompt with all relevant context pre-inlined, and lets the LLM execute. When the LLM finishes, autonomous mode reconciles the UOK ledger and projections before dispatching the next unit. Markdown files are projections for humans when structured state exists.
### The Loop

View file

@ -3,7 +3,7 @@ title: "Auto mode"
description: "SF's autonomous execution engine — run /sf autonomous, walk away, come back to built software with clean git history."
---
Auto mode is a **state machine driven by files on disk**. It reads `.sf/STATE.md`, determines the next unit of work, creates a fresh agent session with pre-loaded context, and lets the LLM execute. When the LLM finishes, auto mode reads disk state again and dispatches the next unit.
Auto mode is governed by the **Unified Operation Kernel (UOK)**. UOK reads canonical project state, records lifecycle and recovery in the DB-backed ledger, and writes runtime files as projections for query, UI, and compatibility. It determines the next unit of work, creates a fresh agent session with pre-loaded context, and lets the LLM execute. When the LLM finishes, auto mode reconciles the UOK ledger and projections before dispatching the next unit.
## The loop

View file

@ -9,7 +9,7 @@ SF is an autonomous coding agent. Describe what you want built, run `/sf autonom
<CardGroup cols={2}>
<Card title="Autonomous execution" icon="robot">
A state machine reads your project state, dispatches work to an LLM in fresh context windows, and advances through research, planning, execution, and verification — all without manual intervention.
UOK reads your canonical project state, dispatches work to an LLM in fresh context windows, records recovery in the DB-backed ledger, and advances through research, planning, execution, and verification — all without manual intervention.
</Card>
<Card title="Clean git history" icon="code-branch">
Every task produces a conventional commit. Milestones are squash-merged to main. Your `git log` reads like a changelog.

View file

@ -93,7 +93,7 @@ import { isClosedStatus } from "./status-guards.js";
import {
reconcileDurableCompleteUnitRuntimeRecords,
reconcileStaleCompleteSliceRecords,
} from "./unit-runtime.js";
} from "./uok/unit-runtime.js";
import { logError, logWarning } from "./workflow-logger.js";
function safeSetWidget(ctx, key, content, options) {

View file

@ -26,7 +26,7 @@ import {
inspectExecuteTaskDurability,
readUnitRuntimeRecord,
writeUnitRuntimeRecord,
} from "./unit-runtime.js";
} from "./uok/unit-runtime.js";
function relToBase(basePath, path) {
const rel = relative(basePath, path);

View file

@ -36,7 +36,7 @@ import { getMilestoneSlices, getSliceTasks, isDbAvailable } from "./sf-db.js";
import {
readUnitRuntimeRecord,
writeUnitRuntimeRecord,
} from "./unit-runtime.js";
} from "./uok/unit-runtime.js";
import { logError, logWarning } from "./workflow-logger.js";
/**
* Set up all four supervision timers for the current unit:

View file

@ -173,11 +173,10 @@ import {
resetSkillTelemetry,
} from "./skill-telemetry.js";
import { resolveUokFlags } from "./uok/flags.js";
import { runAutoLoopWithUok } from "./uok/kernel.js";
import {
writeParityHeartbeat,
writeParityReport,
} from "./uok/parity-report.js";
recordUokKernelTermination,
runAutoLoopWithUok,
} from "./uok/kernel.js";
import { logWarning, setLogBasePath } from "./workflow-logger.js";
import {
autoCommitCurrentBranch,
@ -351,19 +350,16 @@ function registerSigtermHandler(currentBasePath) {
const prefs = loadEffectiveSFPreferences()?.preferences;
const flags = { ...resolveUokFlags(prefs), enabled: true };
const onSignal = () => {
// Write UOK parity exit heartbeat before process.exit(0) bypasses
// the finally block in runAutoLoopWithUok. Fixes the enter/exit
// mismatch that occurs when auto-mode terminates via signal.
writeParityHeartbeat(currentBasePath, {
ts: new Date().toISOString(),
...(s.currentUokRunId ? { runId: s.currentUokRunId } : {}),
// Record UOK termination before process.exit(0) bypasses the async
// finally block in runAutoLoopWithUok. This updates the DB ledger and
// emits the parity heartbeat from one source of truth.
recordUokKernelTermination({
basePath: currentBasePath,
runId: s.currentUokRunId,
sessionId: s.cmdCtx?.sessionManager?.getSessionId?.(),
path: "uok-kernel",
flags: { ...flags },
phase: "exit",
status: "signal",
});
writeParityReport(currentBasePath);
};
s.sigtermHandler = _registerSigtermHandler(
currentBasePath,

View file

@ -78,10 +78,6 @@ import { getEligibleSlices } from "../slice-parallel-eligibility.js";
import { startSliceParallel } from "../slice-parallel-orchestrator.js";
import { handleProductAudit } from "../tools/product-audit-tool.js";
import { parseUnitId } from "../unit-id.js";
import {
clearUnitRuntimeRecord,
writeUnitRuntimeRecord,
} from "../unit-runtime.js";
import { resolveUokFlags } from "../uok/flags.js";
import { UokGateRunner } from "../uok/gate-runner.js";
import {
@ -89,6 +85,10 @@ import {
isEmptyPlanV2GraphResult,
isMissingFinalizedContextResult,
} from "../uok/plan-v2.js";
import {
clearUnitRuntimeRecord,
writeUnitRuntimeRecord,
} from "../uok/unit-runtime.js";
import {
_resetLogs,
drainAndSummarize,

View file

@ -72,13 +72,13 @@ import {
} from "./session-lock.js";
import { getMilestoneSlices, isDbAvailable } from "./sf-db.js";
import { deriveState } from "./state.js";
import {
clearUnitRuntimeRecord,
listUnitRuntimeRecords,
} from "./unit-runtime.js";
import { resolveUokFlags } from "./uok/flags.js";
import { UokGateRunner } from "./uok/gate-runner.js";
import { ensurePlanV2Graph as ensurePlanningFlowGraph } from "./uok/plan-v2.js";
import {
clearUnitRuntimeRecord,
listUnitRuntimeRecords,
} from "./uok/unit-runtime.js";
import { validateDirectory } from "./validate-directory.js";
import {
getRequiredWorkflowToolsForGuidedUnit,

View file

@ -7,7 +7,7 @@ import { resolveAutoSupervisorConfig } from "../preferences.js";
import {
readUnitRuntimeRecord,
writeUnitRuntimeRecord,
} from "../unit-runtime.js";
} from "../uok/unit-runtime.js";
test("resolveAutoSupervisorConfig provides safe timeout defaults", () => {
const supervisor = resolveAutoSupervisorConfig();

View file

@ -16,12 +16,16 @@ import {
recordUokRunExit,
recordUokRunStart,
} from "../sf-db.js";
import { runAutoLoopWithUok } from "../uok/kernel.js";
import {
recordUokKernelTermination,
runAutoLoopWithUok,
} from "../uok/kernel.js";
import {
buildParityReport,
hasCurrentParityWarning,
parseParityEvents,
UNMATCHED_RUN_STALE_MS,
writeParityReport,
} from "../uok/parity-report.js";
const NOW = Date.parse("2026-05-06T00:00:00.000Z");
@ -413,3 +417,59 @@ test("runAutoLoopWithUok_throw_still_writes_exit_and_current_error_report", asyn
assert.deepEqual(report.criticalMismatches, ["boom"]);
assert.equal(hasCurrentParityWarning(report), true);
});
test("recordUokKernelTermination_marks_started_ledger_run_exited_on_signal", () => {
const projectRoot = makeProject();
openDatabase(":memory:");
recordUokRunStart({
runId: "uok-signal-run",
sessionId: "session-signal",
path: "uok-kernel",
flags: { enabled: true },
startedAt: new Date(NOW - 5_000).toISOString(),
});
const report = recordUokKernelTermination({
basePath: projectRoot,
runId: "uok-signal-run",
sessionId: "session-signal",
flags: { enabled: true },
status: "signal",
});
const runs = getUokRuns();
assert.equal(runs.length, 1);
assert.equal(runs[0].status, "signal");
assert.equal(typeof runs[0].endedAt, "string");
assert.equal(report.missingExitEvents, 0);
assert.equal(hasCurrentParityWarning(report), false);
const events = readProjectParityEvents(projectRoot);
assert.equal(events.length, 1);
assert.equal(events[0].phase, "exit");
assert.equal(events[0].status, "signal");
});
test("writeParityReport_recovers_started_ledger_runs_when_no_auto_lock_owner_exists", () => {
const projectRoot = makeProject();
openDatabase(":memory:");
recordUokRunStart({
runId: "uok-orphan-run",
sessionId: "session-orphan",
path: "uok-kernel",
flags: { enabled: true },
startedAt: new Date(NOW - 5_000).toISOString(),
});
const report = writeParityReport(projectRoot, NOW);
const runs = getUokRuns();
assert.equal(runs.length, 1);
assert.equal(runs[0].status, "recovered");
assert.equal(
runs[0].error,
"uok recovered missing exit: no live auto.lock owner",
);
assert.equal(report.missingExitEvents, 0);
assert.equal(report.freshUnmatchedRuns.length, 0);
assert.equal(hasCurrentParityWarning(report), false);
});

View file

@ -1,621 +1,24 @@
import {
existsSync,
mkdirSync,
readdirSync,
readFileSync,
unlinkSync,
writeFileSync,
} from "node:fs";
import { join } from "node:path";
import {
countMustHavesMentionedInSummary,
loadFile,
parseSummary,
parseTaskPlanMustHaves,
} from "./files.js";
import {
relSliceFile,
relTaskFile,
resolveSliceFile,
resolveTaskFile,
sfRoot,
} from "./paths.js";
import { getSlice, isDbAvailable } from "./sf-db.js";
import { parseUnitId } from "./unit-id.js";
/**
* Lists every durable unit runtime status in FSM order.
* unit-runtime.ts Barrel re-export for the UOK unit-runtime projection.
*
* Purpose: give dispatch, recovery, and query surfaces one canonical state
* vocabulary so terminal units cannot be redispatched by ambiguous legacy phases.
*
* Consumer: auto runtime persistence, unit-runtime tests, headless query summaries.
* The implementation has moved into the UOK subsystem under uok/unit-runtime.js.
* This file preserves the original public API so external consumers
* continue to work without changes.
*/
export const UNIT_RUNTIME_STATUSES = [
"queued",
"claimed",
"running",
"progress",
"completed",
"failed",
"blocked",
"cancelled",
"stale",
"runaway-recovered",
"notified",
];
/**
* Names the unit statuses that end an execution attempt.
*
* Purpose: centralize the terminal-state union so retry and notification policy
* does not drift between watchdog recovery and dispatch preview logic.
*
* Consumer: decideUnitRuntimeDispatch and operator-facing query summaries.
*/
export const UNIT_RUNTIME_TERMINAL_STATUSES = [
"completed",
"failed",
"blocked",
"cancelled",
"stale",
"runaway-recovered",
];
/**
* Describes the explicit unit runtime finite-state-machine transitions.
*
* Purpose: make retry, notification, and reset transitions reviewable as data
* instead of implied by ad hoc marker files or legacy phase strings.
*
* Consumer: unit runtime tests, future dispatch/reconciler guards.
*/
export const UNIT_RUNTIME_TRANSITIONS = {
queued: ["claimed", "cancelled"],
claimed: ["running", "stale", "cancelled"],
running: [
"progress",
"completed",
"failed",
"blocked",
"cancelled",
"stale",
"runaway-recovered",
],
progress: [
"running",
"completed",
"failed",
"blocked",
"cancelled",
"stale",
"runaway-recovered",
],
completed: ["notified"],
failed: ["queued", "notified"],
blocked: ["notified"],
cancelled: ["notified"],
stale: ["queued", "notified"],
"runaway-recovered": ["queued", "notified"],
notified: ["queued"],
};
const DEFAULT_UNIT_RUNTIME_MAX_RETRIES = 1;
const RETRYABLE_TERMINAL_STATUSES = new Set([
"failed",
"stale",
"runaway-recovered",
]);
function hasUpdate(updates, key) {
return Object.hasOwn(updates, key);
}
function phaseForStatus(status) {
switch (status) {
case "queued":
case "claimed":
case "running":
return "dispatched";
case "progress":
return "wrapup-warning-sent";
case "completed":
return "finalized";
default:
return status;
}
}
function inferStatusFromPhase(phase, record) {
if (UNIT_RUNTIME_STATUSES.includes(phase)) {
return phase;
}
switch (phase) {
case "dispatched":
return "running";
case "wrapup-warning-sent":
case "runaway-warning-sent":
case "runaway-final-warning-sent":
case "recovered":
return "progress";
case "timeout":
return "stale";
case "finalized":
return "completed";
case "paused":
return record?.runawayGuardPause ? "runaway-recovered" : "blocked";
case "skipped":
return "blocked";
default:
return "running";
}
}
function retryBudgetRemaining(retryCount, maxRetries) {
return Math.max(0, maxRetries - retryCount);
}
/**
* Returns true when a runtime status is terminal for one execution attempt.
*
* Purpose: keep terminal-state checks exhaustive against the exported terminal
* union rather than hard-coded differently at each caller.
*
* Consumer: decideUnitRuntimeDispatch and query summary generation.
*/
export function isTerminalUnitRuntimeStatus(status) {
return UNIT_RUNTIME_TERMINAL_STATUSES.includes(status);
}
/**
* Returns the normalized FSM state embedded in a runtime record.
*
* Purpose: let legacy records with only `phase` still participate in retry and
* query policy while new records persist explicit FSM fields.
*
* Consumer: decideUnitRuntimeDispatch and headless query summaries.
*/
export function getUnitRuntimeState(record) {
const status = record.status ?? inferStatusFromPhase(record.phase, record);
const retryCount = record.retryCount ?? record.recoveryAttempts ?? 0;
const maxRetries = record.maxRetries ?? DEFAULT_UNIT_RUNTIME_MAX_RETRIES;
return {
status,
retryCount,
maxRetries,
lastHeartbeatAt: record.lastHeartbeatAt ?? null,
lastProgressAt: record.lastProgressAt,
lastOutputAt: record.lastOutputAt ?? null,
outputPath: record.outputPath ?? null,
watchdogReason: record.watchdogReason ?? null,
notifiedAt: record.notifiedAt ?? null,
};
}
/**
* Returns true for synthetic units that must be reset before rerun.
*
* Purpose: prevent synthetic orchestration units such as parallel research from
* looping after failure while preserving normal task retry behavior.
*
* Consumer: decideUnitRuntimeDispatch.
*/
export function isSyntheticUnitRuntime(record) {
return (
record.unitType === "synthetic" ||
record.unitId.includes("parallel-research")
);
}
/**
* Decides whether a unit runtime record permits dispatch, retry, notify, or block.
*
* Purpose: enforce retry budgets and explicit reset requirements before callers
* schedule another copy of a failed or stale unit.
*
* Consumer: unit-runtime FSM tests and headless query runtime summaries.
*/
export function decideUnitRuntimeDispatch(record, options = {}) {
if (!record) {
return {
action: "dispatch",
reasonCode: "no-runtime-record",
retryCount: 0,
maxRetries: DEFAULT_UNIT_RUNTIME_MAX_RETRIES,
retryBudgetRemaining: DEFAULT_UNIT_RUNTIME_MAX_RETRIES,
};
}
const state = getUnitRuntimeState(record);
const remaining = retryBudgetRemaining(state.retryCount, state.maxRetries);
const common = {
retryCount: state.retryCount,
maxRetries: state.maxRetries,
retryBudgetRemaining: remaining,
};
if (state.notifiedAt !== null) {
return { action: "skip", reasonCode: "already-notified", ...common };
}
if (state.status === "notified") {
return { action: "skip", reasonCode: "notified", ...common };
}
if (state.status === "queued") {
return { action: "dispatch", reasonCode: "queued", ...common };
}
if (!isTerminalUnitRuntimeStatus(state.status)) {
return { action: "skip", reasonCode: "active-or-claimed", ...common };
}
const synthetic = options.synthetic ?? isSyntheticUnitRuntime(record);
if (synthetic && state.status !== "completed") {
return {
action: "block",
reasonCode: "synthetic-reset-required",
...common,
};
}
if (RETRYABLE_TERMINAL_STATUSES.has(state.status)) {
if (remaining > 0) {
return {
action: "retry",
reasonCode: "retry-budget-available",
...common,
};
}
return { action: "block", reasonCode: "retry-budget-exhausted", ...common };
}
if (
state.status === "completed" ||
state.status === "blocked" ||
state.status === "cancelled"
) {
return {
action: "notify",
reasonCode: "terminal-ready-to-notify",
...common,
};
}
return { action: "skip", reasonCode: "terminal-nonretryable", ...common };
}
function runtimeDir(basePath) {
return join(sfRoot(basePath), "runtime", "units");
}
function runtimePath(basePath, unitType, unitId) {
const sanitizedUnitType = unitType.replace(/[/]/g, "-");
const sanitizedUnitId = unitId.replace(/[/]/g, "-");
return join(
runtimeDir(basePath),
`${sanitizedUnitType}-${sanitizedUnitId}.json`,
);
}
// ─── In-memory runtime record cache ─────────────────────────────────────────
// Avoids repeated disk reads for the same unit within a single dispatch cycle.
const _runtimeCache = new Map();
function readUnitRuntimeRecordFromDisk(path) {
if (!existsSync(path)) return null;
try {
return JSON.parse(readFileSync(path, "utf-8"));
} catch {
return null;
}
}
export function writeUnitRuntimeRecord(
basePath,
unitType,
unitId,
startedAt,
updates = {},
) {
const dir = runtimeDir(basePath);
mkdirSync(dir, { recursive: true });
const path = runtimePath(basePath, unitType, unitId);
const prev = _runtimeCache.get(path) ?? null;
const phase =
updates.phase ??
(updates.status ? phaseForStatus(updates.status) : prev?.phase) ??
"dispatched";
const status =
updates.status ??
(updates.phase || !prev?.status
? inferStatusFromPhase(phase, {
runawayGuardPause:
updates.runawayGuardPause ?? prev?.runawayGuardPause,
})
: prev.status);
const recoveryAttempts = hasUpdate(updates, "recoveryAttempts")
? (updates.recoveryAttempts ?? 0)
: (prev?.recoveryAttempts ?? 0);
const retryCount = hasUpdate(updates, "retryCount")
? (updates.retryCount ?? 0)
: hasUpdate(updates, "recoveryAttempts")
? (updates.recoveryAttempts ?? 0)
: (prev?.retryCount ?? recoveryAttempts ?? 0);
const next = {
version: 1,
unitType,
unitId,
startedAt,
updatedAt: Date.now(),
phase,
status,
wrapupWarningSent:
updates.wrapupWarningSent ?? prev?.wrapupWarningSent ?? false,
continueHereFired:
updates.continueHereFired ?? prev?.continueHereFired ?? false,
timeoutAt: hasUpdate(updates, "timeoutAt")
? (updates.timeoutAt ?? null)
: (prev?.timeoutAt ?? null),
lastHeartbeatAt: hasUpdate(updates, "lastHeartbeatAt")
? (updates.lastHeartbeatAt ?? null)
: (prev?.lastHeartbeatAt ?? startedAt),
lastProgressAt:
updates.lastProgressAt ?? prev?.lastProgressAt ?? Date.now(),
progressCount: updates.progressCount ?? prev?.progressCount ?? 0,
lastProgressKind:
updates.lastProgressKind ?? prev?.lastProgressKind ?? "dispatch",
lastOutputAt: hasUpdate(updates, "lastOutputAt")
? (updates.lastOutputAt ?? null)
: (prev?.lastOutputAt ?? null),
outputPath: hasUpdate(updates, "outputPath")
? (updates.outputPath ?? null)
: (prev?.outputPath ?? null),
watchdogReason: hasUpdate(updates, "watchdogReason")
? (updates.watchdogReason ?? null)
: (prev?.watchdogReason ?? null),
notifiedAt: hasUpdate(updates, "notifiedAt")
? (updates.notifiedAt ?? null)
: (prev?.notifiedAt ?? null),
recovery: updates.recovery ?? prev?.recovery,
recoveryAttempts,
retryCount,
maxRetries:
updates.maxRetries ??
prev?.maxRetries ??
DEFAULT_UNIT_RUNTIME_MAX_RETRIES,
lastRecoveryReason: updates.lastRecoveryReason ?? prev?.lastRecoveryReason,
runawayGuardPause: updates.runawayGuardPause ?? prev?.runawayGuardPause,
};
writeFileSync(path, JSON.stringify(next, null, 2) + "\n", "utf-8");
_runtimeCache.set(path, next);
return next;
}
export function readUnitRuntimeRecord(basePath, unitType, unitId) {
const path = runtimePath(basePath, unitType, unitId);
const cached = _runtimeCache.get(path);
if (cached !== undefined) return cached;
const record = readUnitRuntimeRecordFromDisk(path);
if (record !== null) _runtimeCache.set(path, record);
return record;
}
export function clearUnitRuntimeRecord(basePath, unitType, unitId) {
const path = runtimePath(basePath, unitType, unitId);
_runtimeCache.delete(path);
if (existsSync(path)) unlinkSync(path);
}
/**
* Return all runtime records currently on disk for `basePath`.
* Returns an empty array if the runtime directory does not exist.
*/
export function listUnitRuntimeRecords(basePath) {
const dir = runtimeDir(basePath);
if (!existsSync(dir)) return [];
const results = [];
for (const file of readdirSync(dir)) {
if (!file.endsWith(".json")) continue;
try {
const raw = readFileSync(join(dir, file), "utf-8");
const record = JSON.parse(raw);
results.push(record);
} catch {
// Skip malformed files
}
}
return results;
}
export async function inspectExecuteTaskDurability(basePath, unitId) {
const { milestone: mid, slice: sid, task: tid } = parseUnitId(unitId);
if (!mid || !sid || !tid) return null;
const planAbs = resolveSliceFile(basePath, mid, sid, "PLAN");
const summaryAbs = resolveTaskFile(basePath, mid, sid, tid, "SUMMARY");
const stateAbs = join(sfRoot(basePath), "STATE.md");
const planPath = relSliceFile(basePath, mid, sid, "PLAN");
const summaryPath = relTaskFile(basePath, mid, sid, tid, "SUMMARY");
const planContent = planAbs ? await loadFile(planAbs) : null;
const stateContent = existsSync(stateAbs)
? readFileSync(stateAbs, "utf-8")
: "";
const summaryExists = !!(summaryAbs && existsSync(summaryAbs));
const escapedTid = tid.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
const taskChecked =
!!planContent &&
new RegExp(`^- \\[[xX]\\] \\*\\*${escapedTid}:`, "m").test(planContent);
const nextActionAdvanced = !new RegExp(`Execute ${tid}\\b`).test(
stateContent,
);
// Must-have coverage: load task plan and count mentions in summary
let mustHaveCount = 0;
let mustHavesMentionedInSummary = 0;
const taskPlanAbs = resolveTaskFile(basePath, mid, sid, tid, "PLAN");
if (taskPlanAbs) {
const taskPlanContent = await loadFile(taskPlanAbs);
if (taskPlanContent) {
const mustHaves = parseTaskPlanMustHaves(taskPlanContent);
mustHaveCount = mustHaves.length;
if (mustHaveCount > 0 && summaryExists && summaryAbs) {
const summaryContent = await loadFile(summaryAbs);
if (summaryContent) {
mustHavesMentionedInSummary = countMustHavesMentionedInSummary(
mustHaves,
summaryContent,
);
}
}
}
}
return {
planPath,
summaryPath,
summaryExists,
taskChecked,
nextActionAdvanced,
mustHaveCount,
mustHavesMentionedInSummary,
};
}
export function formatExecuteTaskRecoveryStatus(status) {
const missing = [];
if (!status.summaryExists)
missing.push(`summary missing (${status.summaryPath})`);
if (!status.taskChecked)
missing.push(`task checkbox unchecked in ${status.planPath}`);
if (!status.nextActionAdvanced)
missing.push("state next action still points at the timed-out task");
if (
status.mustHaveCount > 0 &&
status.mustHavesMentionedInSummary < status.mustHaveCount
) {
missing.push(
`must-have gap: ${status.mustHavesMentionedInSummary} of ${status.mustHaveCount} must-haves addressed in summary`,
);
}
return missing.length > 0
? missing.join("; ")
: "all durable task artifacts present";
}
// ─── Stale slice runtime record reconciliation ──────────────────────────────
/**
* Clear unit runtime records for complete-slice units that are in a terminal
* non-completed state (cancelled, failed, stale) but whose slice is actually
* complete in the DB and has a valid SUMMARY.md.
*
* Purpose: prevent the pi runtime flow-audit from emitting false-positive
* stale-dispatch warnings for slices that completed successfully on retry.
* The flow-audit reads journal/runtime state but does not check for later
* successful retries or existing artifact files (#sf-moqv5o7h-vaabu6).
*
* Consumer: bootstrapAutoSession in auto-start.ts, called after
* cleanStaleRuntimeUnits.
*/
export function reconcileStaleCompleteSliceRecords(basePath) {
const dir = runtimeDir(basePath);
if (!existsSync(dir)) return { cleared: 0, details: [] };
let cleared = 0;
const details = [];
for (const file of readdirSync(dir)) {
if (!file.endsWith(".json")) continue;
let record;
try {
record = JSON.parse(readFileSync(join(dir, file), "utf-8"));
} catch {
continue;
}
if (record.unitType !== "complete-slice") continue;
const state = getUnitRuntimeState(record);
// Only target terminal non-completed states that could trigger
// flow-audit warnings.
if (
!["cancelled", "failed", "stale", "runaway-recovered"].includes(
state.status,
)
)
continue;
const { milestone: mid, slice: sid } = parseUnitId(record.unitId);
if (!mid || !sid) continue;
// DB check: slice status must be "complete"
let dbComplete = false;
if (isDbAvailable()) {
try {
const sliceRow = getSlice(mid, sid);
dbComplete = sliceRow?.status === "complete";
} catch {
// DB read failure — skip this record rather than risk data loss
continue;
}
}
if (!dbComplete) continue;
// Artifact check: SUMMARY.md must exist with a valid completed_at
const summaryPath = resolveSliceFile(basePath, mid, sid, "SUMMARY");
let artifactValid = false;
if (summaryPath && existsSync(summaryPath)) {
try {
const content = readFileSync(summaryPath, "utf-8");
const summary = parseSummary(content);
artifactValid = !!summary.frontmatter.completed_at;
} catch {
artifactValid = false;
}
}
if (!artifactValid) continue;
// All checks pass — clear the stale runtime record
try {
unlinkSync(join(dir, file));
_runtimeCache.delete(join(dir, file));
cleared++;
details.push(`${record.unitId} (was ${state.status})`);
} catch (_err) {
// Non-fatal — record stays, but at least we tried
}
}
return { cleared, details };
}
/**
* Clear runtime records whose durable artifacts already prove completion.
*
* Purpose: recover from crashes, process timeouts, or hard exits that happen
* after a unit wrote its durable completion artifacts but before the in-memory
* finalizer cleared `.sf/runtime/units/*.json`.
*
* Consumer: auto-mode bootstrap before dispatching the next autonomous unit.
*/
export async function reconcileDurableCompleteUnitRuntimeRecords(basePath) {
const dir = runtimeDir(basePath);
if (!existsSync(dir)) return { cleared: 0, details: [] };
let cleared = 0;
const details = [];
for (const file of readdirSync(dir)) {
if (!file.endsWith(".json")) continue;
const abs = join(dir, file);
let record;
try {
record = JSON.parse(readFileSync(abs, "utf-8"));
} catch {
continue;
}
if (!record.unitType || !record.unitId) continue;
let durableComplete = false;
if (record.unitType === "execute-task") {
const status = await inspectExecuteTaskDurability(
basePath,
record.unitId,
);
durableComplete = !!(
status?.summaryExists &&
status.taskChecked &&
status.nextActionAdvanced
);
} else if (record.unitType === "complete-slice") {
const { milestone: mid, slice: sid } = parseUnitId(record.unitId);
if (mid && sid) {
let dbComplete = false;
if (isDbAvailable()) {
try {
const sliceRow = getSlice(mid, sid);
dbComplete = sliceRow?.status === "complete";
} catch {
dbComplete = false;
}
}
const summaryPath = resolveSliceFile(basePath, mid, sid, "SUMMARY");
let artifactValid = false;
if (summaryPath && existsSync(summaryPath)) {
try {
const content = readFileSync(summaryPath, "utf-8");
const summary = parseSummary(content);
artifactValid = !!summary.frontmatter.completed_at;
} catch {
artifactValid = false;
}
}
durableComplete = dbComplete && artifactValid;
}
}
if (!durableComplete) continue;
try {
unlinkSync(abs);
_runtimeCache.delete(abs);
cleared++;
const state = getUnitRuntimeState(record);
details.push(`${record.unitType} ${record.unitId} (was ${state.status})`);
} catch {
// Non-fatal — leave the record for the next bootstrap/doctor pass.
}
}
return { cleared, details };
}
export {
clearUnitRuntimeRecord,
decideUnitRuntimeDispatch,
formatExecuteTaskRecoveryStatus,
getUnitRuntimeState,
inspectExecuteTaskDurability,
isSyntheticUnitRuntime,
isTerminalUnitRuntimeStatus,
listUnitRuntimeRecords,
readUnitRuntimeRecord,
reconcileDurableCompleteUnitRuntimeRecords,
reconcileStaleCompleteSliceRecords,
UNIT_RUNTIME_STATUSES,
UNIT_RUNTIME_TERMINAL_STATUSES,
UNIT_RUNTIME_TRANSITIONS,
writeUnitRuntimeRecord,
} from "./uok/unit-runtime.js";

View file

@ -33,6 +33,47 @@ function refreshParityReport(basePath) {
function resolveKernelPathLabel() {
return "uok-kernel";
}
/**
* Records an abnormal UOK kernel termination in both durable stores.
*
* Purpose: keep the DB-backed UOK run ledger and JSONL parity heartbeat
* symmetrical when auto-mode exits via signal and bypasses the async kernel
* finally block.
*
* Consumer: auto signal cleanup and UOK parity tests.
*/
export function recordUokKernelTermination({
basePath,
runId,
sessionId,
flags,
status = "signal",
error,
}) {
const endedAt = new Date().toISOString();
if (runId && isDbAvailable()) {
recordUokRunExit({
runId,
sessionId,
path: resolveKernelPathLabel(),
flags: { ...(flags ?? {}) },
status,
endedAt,
...(error ? { error } : {}),
});
}
writeParityHeartbeat(basePath, {
ts: endedAt,
...(runId ? { runId } : {}),
sessionId,
path: resolveKernelPathLabel(),
flags: { ...(flags ?? {}) },
phase: "exit",
status,
...(error ? { error } : {}),
});
return refreshParityReport(basePath);
}
export async function runAutoLoopWithUok(args) {
const { ctx, pi, s, deps, runKernelLoop } = args;
const prefs = deps.loadEffectiveSFPreferences()?.preferences;
@ -104,29 +145,14 @@ export async function runAutoLoopWithUok(args) {
error = err instanceof Error ? err.message : String(err);
throw err;
} finally {
const endedAt = new Date().toISOString();
if (isDbAvailable()) {
recordUokRunExit({
runId,
sessionId: ctx.sessionManager?.getSessionId?.(),
path: resolveKernelPathLabel(),
flags: { ...flags },
status,
endedAt,
...(error ? { error } : {}),
});
}
writeParityHeartbeat(s.basePath, {
ts: endedAt,
recordUokKernelTermination({
basePath: s.basePath,
runId,
sessionId: ctx.sessionManager?.getSessionId?.(),
path: resolveKernelPathLabel(),
flags: { ...flags },
phase: "exit",
status,
...(error ? { error } : {}),
});
refreshParityReport(s.basePath);
if (s.currentUokRunId === runId) s.currentUokRunId = undefined;
}
}

View file

@ -8,7 +8,7 @@ import {
} from "node:fs";
import { join } from "node:path";
import { sfRoot } from "../paths.js";
import { getUokRuns, isDbAvailable } from "../sf-db.js";
import { getUokRuns, isDbAvailable, recordUokRunExit } from "../sf-db.js";
export const UNMATCHED_RUN_STALE_MS = 30 * 60 * 1000;
@ -38,6 +38,37 @@ function isFreshTimestamp(value, nowMs, staleMs) {
const ms = timestampMs(value);
return ms !== undefined && nowMs - ms <= staleMs;
}
function hasLiveAutoLock(basePath) {
const lockPath = join(sfRoot(basePath), "auto.lock");
if (!existsSync(lockPath)) return false;
try {
const lock = JSON.parse(readFileSync(lockPath, "utf-8"));
const pid = Number(lock.pid);
if (!Number.isFinite(pid) || pid <= 0) return false;
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
function recoverOrphanedStartedLedgerRuns(basePath, ledgerRuns, nowIso) {
if (hasLiveAutoLock(basePath)) return 0;
let recovered = 0;
for (const run of ledgerRuns) {
if (run.status !== "started" || run.endedAt) continue;
recordUokRunExit({
runId: run.runId,
sessionId: run.sessionId,
path: run.path,
flags: run.flags ?? {},
status: "recovered",
endedAt: nowIso,
error: "uok recovered missing exit: no live auto.lock owner",
});
recovered += 1;
}
return recovered;
}
export function parseParityEvents(raw) {
return raw
.split("\n")
@ -275,6 +306,14 @@ export function writeParityReport(basePath, nowMs = Date.now()) {
let ledgerRuns = [];
try {
ledgerRuns = isDbAvailable() ? getUokRuns() : [];
if (ledgerRuns.length > 0 && isDbAvailable()) {
const recovered = recoverOrphanedStartedLedgerRuns(
basePath,
ledgerRuns,
new Date(nowMs).toISOString(),
);
if (recovered > 0) ledgerRuns = getUokRuns();
}
} catch {
ledgerRuns = [];
}

View file

@ -0,0 +1,622 @@
import {
existsSync,
mkdirSync,
readdirSync,
readFileSync,
unlinkSync,
writeFileSync,
} from "node:fs";
import { join } from "node:path";
import {
countMustHavesMentionedInSummary,
loadFile,
parseSummary,
parseTaskPlanMustHaves,
} from "../files.js";
import {
relSliceFile,
relTaskFile,
resolveSliceFile,
resolveTaskFile,
sfRoot,
} from "../paths.js";
import { getSlice, isDbAvailable } from "../sf-db.js";
import { parseUnitId } from "../unit-id.js";
/**
* Lists every unit runtime projection status in UOK lifecycle order.
*
* Purpose: keep the `.sf/runtime/units` compatibility projection aligned with
* the UOK-owned lifecycle vocabulary so query and older recovery surfaces do not
* redispatch terminal units through ambiguous legacy phases.
*
* Consumer: auto runtime persistence, unit-runtime tests, headless query summaries.
*/
export const UNIT_RUNTIME_STATUSES = [
"queued",
"claimed",
"running",
"progress",
"completed",
"failed",
"blocked",
"cancelled",
"stale",
"runaway-recovered",
"notified",
];
/**
* Names the unit statuses that end an execution attempt.
*
* Purpose: centralize the terminal-state union so retry and notification policy
* does not drift between watchdog recovery and dispatch preview logic.
*
* Consumer: decideUnitRuntimeDispatch and operator-facing query summaries.
*/
export const UNIT_RUNTIME_TERMINAL_STATUSES = [
"completed",
"failed",
"blocked",
"cancelled",
"stale",
"runaway-recovered",
];
/**
* Describes the explicit unit runtime lifecycle transitions.
*
* Purpose: make retry, notification, and reset projection transitions reviewable
* as data while UOK owns authoritative lifecycle and recovery decisions.
*
* Consumer: unit runtime tests and UOK projection/reconciler guards.
*/
export const UNIT_RUNTIME_TRANSITIONS = {
queued: ["claimed", "cancelled"],
claimed: ["running", "stale", "cancelled"],
running: [
"progress",
"completed",
"failed",
"blocked",
"cancelled",
"stale",
"runaway-recovered",
],
progress: [
"running",
"completed",
"failed",
"blocked",
"cancelled",
"stale",
"runaway-recovered",
],
completed: ["notified"],
failed: ["queued", "notified"],
blocked: ["notified"],
cancelled: ["notified"],
stale: ["queued", "notified"],
"runaway-recovered": ["queued", "notified"],
notified: ["queued"],
};
const DEFAULT_UNIT_RUNTIME_MAX_RETRIES = 1;
const RETRYABLE_TERMINAL_STATUSES = new Set([
"failed",
"stale",
"runaway-recovered",
]);
function hasUpdate(updates, key) {
return Object.hasOwn(updates, key);
}
function phaseForStatus(status) {
switch (status) {
case "queued":
case "claimed":
case "running":
return "dispatched";
case "progress":
return "wrapup-warning-sent";
case "completed":
return "finalized";
default:
return status;
}
}
function inferStatusFromPhase(phase, record) {
if (UNIT_RUNTIME_STATUSES.includes(phase)) {
return phase;
}
switch (phase) {
case "dispatched":
return "running";
case "wrapup-warning-sent":
case "runaway-warning-sent":
case "runaway-final-warning-sent":
case "recovered":
return "progress";
case "timeout":
return "stale";
case "finalized":
return "completed";
case "paused":
return record?.runawayGuardPause ? "runaway-recovered" : "blocked";
case "skipped":
return "blocked";
default:
return "running";
}
}
function retryBudgetRemaining(retryCount, maxRetries) {
return Math.max(0, maxRetries - retryCount);
}
/**
* Returns true when a runtime status is terminal for one execution attempt.
*
* Purpose: keep terminal-state checks exhaustive against the exported terminal
* union rather than hard-coded differently at each caller.
*
* Consumer: decideUnitRuntimeDispatch and query summary generation.
*/
export function isTerminalUnitRuntimeStatus(status) {
return UNIT_RUNTIME_TERMINAL_STATUSES.includes(status);
}
/**
* Returns the normalized UOK runtime state embedded in a runtime record.
*
* Purpose: let legacy records with only `phase` still participate in retry and
* query policy while new records persist explicit runtime state fields.
*
* Consumer: decideUnitRuntimeDispatch and headless query summaries.
*/
export function getUnitRuntimeState(record) {
const status = record.status ?? inferStatusFromPhase(record.phase, record);
const retryCount = record.retryCount ?? record.recoveryAttempts ?? 0;
const maxRetries = record.maxRetries ?? DEFAULT_UNIT_RUNTIME_MAX_RETRIES;
return {
status,
retryCount,
maxRetries,
lastHeartbeatAt: record.lastHeartbeatAt ?? null,
lastProgressAt: record.lastProgressAt,
lastOutputAt: record.lastOutputAt ?? null,
outputPath: record.outputPath ?? null,
watchdogReason: record.watchdogReason ?? null,
notifiedAt: record.notifiedAt ?? null,
};
}
/**
* Returns true for synthetic units that must be reset before rerun.
*
* Purpose: prevent synthetic orchestration units such as parallel research from
* looping after failure while preserving normal task retry behavior.
*
* Consumer: decideUnitRuntimeDispatch.
*/
export function isSyntheticUnitRuntime(record) {
return (
record.unitType === "synthetic" ||
record.unitId.includes("parallel-research")
);
}
/**
* Decides whether a unit runtime record permits dispatch, retry, notify, or block.
*
* Purpose: enforce retry budgets and explicit reset requirements before callers
* schedule another copy of a failed or stale unit.
*
* Consumer: unit-runtime projection tests and headless query runtime summaries.
*/
export function decideUnitRuntimeDispatch(record, options = {}) {
if (!record) {
return {
action: "dispatch",
reasonCode: "no-runtime-record",
retryCount: 0,
maxRetries: DEFAULT_UNIT_RUNTIME_MAX_RETRIES,
retryBudgetRemaining: DEFAULT_UNIT_RUNTIME_MAX_RETRIES,
};
}
const state = getUnitRuntimeState(record);
const remaining = retryBudgetRemaining(state.retryCount, state.maxRetries);
const common = {
retryCount: state.retryCount,
maxRetries: state.maxRetries,
retryBudgetRemaining: remaining,
};
if (state.notifiedAt !== null) {
return { action: "skip", reasonCode: "already-notified", ...common };
}
if (state.status === "notified") {
return { action: "skip", reasonCode: "notified", ...common };
}
if (state.status === "queued") {
return { action: "dispatch", reasonCode: "queued", ...common };
}
if (!isTerminalUnitRuntimeStatus(state.status)) {
return { action: "skip", reasonCode: "active-or-claimed", ...common };
}
const synthetic = options.synthetic ?? isSyntheticUnitRuntime(record);
if (synthetic && state.status !== "completed") {
return {
action: "block",
reasonCode: "synthetic-reset-required",
...common,
};
}
if (RETRYABLE_TERMINAL_STATUSES.has(state.status)) {
if (remaining > 0) {
return {
action: "retry",
reasonCode: "retry-budget-available",
...common,
};
}
return { action: "block", reasonCode: "retry-budget-exhausted", ...common };
}
if (
state.status === "completed" ||
state.status === "blocked" ||
state.status === "cancelled"
) {
return {
action: "notify",
reasonCode: "terminal-ready-to-notify",
...common,
};
}
return { action: "skip", reasonCode: "terminal-nonretryable", ...common };
}
function runtimeDir(basePath) {
return join(sfRoot(basePath), "runtime", "units");
}
function runtimePath(basePath, unitType, unitId) {
const sanitizedUnitType = unitType.replace(/[/]/g, "-");
const sanitizedUnitId = unitId.replace(/[/]/g, "-");
return join(
runtimeDir(basePath),
`${sanitizedUnitType}-${sanitizedUnitId}.json`,
);
}
// ─── In-memory runtime record cache ─────────────────────────────────────────
// Avoids repeated disk reads for the same unit within a single dispatch cycle.
const _runtimeCache = new Map();
function readUnitRuntimeRecordFromDisk(path) {
if (!existsSync(path)) return null;
try {
return JSON.parse(readFileSync(path, "utf-8"));
} catch {
return null;
}
}
export function writeUnitRuntimeRecord(
basePath,
unitType,
unitId,
startedAt,
updates = {},
) {
const dir = runtimeDir(basePath);
mkdirSync(dir, { recursive: true });
const path = runtimePath(basePath, unitType, unitId);
const prev = _runtimeCache.get(path) ?? null;
const phase =
updates.phase ??
(updates.status ? phaseForStatus(updates.status) : prev?.phase) ??
"dispatched";
const status =
updates.status ??
(updates.phase || !prev?.status
? inferStatusFromPhase(phase, {
runawayGuardPause:
updates.runawayGuardPause ?? prev?.runawayGuardPause,
})
: prev.status);
const recoveryAttempts = hasUpdate(updates, "recoveryAttempts")
? (updates.recoveryAttempts ?? 0)
: (prev?.recoveryAttempts ?? 0);
const retryCount = hasUpdate(updates, "retryCount")
? (updates.retryCount ?? 0)
: hasUpdate(updates, "recoveryAttempts")
? (updates.recoveryAttempts ?? 0)
: (prev?.retryCount ?? recoveryAttempts ?? 0);
const next = {
version: 1,
unitType,
unitId,
startedAt,
updatedAt: Date.now(),
phase,
status,
wrapupWarningSent:
updates.wrapupWarningSent ?? prev?.wrapupWarningSent ?? false,
continueHereFired:
updates.continueHereFired ?? prev?.continueHereFired ?? false,
timeoutAt: hasUpdate(updates, "timeoutAt")
? (updates.timeoutAt ?? null)
: (prev?.timeoutAt ?? null),
lastHeartbeatAt: hasUpdate(updates, "lastHeartbeatAt")
? (updates.lastHeartbeatAt ?? null)
: (prev?.lastHeartbeatAt ?? startedAt),
lastProgressAt:
updates.lastProgressAt ?? prev?.lastProgressAt ?? Date.now(),
progressCount: updates.progressCount ?? prev?.progressCount ?? 0,
lastProgressKind:
updates.lastProgressKind ?? prev?.lastProgressKind ?? "dispatch",
lastOutputAt: hasUpdate(updates, "lastOutputAt")
? (updates.lastOutputAt ?? null)
: (prev?.lastOutputAt ?? null),
outputPath: hasUpdate(updates, "outputPath")
? (updates.outputPath ?? null)
: (prev?.outputPath ?? null),
watchdogReason: hasUpdate(updates, "watchdogReason")
? (updates.watchdogReason ?? null)
: (prev?.watchdogReason ?? null),
notifiedAt: hasUpdate(updates, "notifiedAt")
? (updates.notifiedAt ?? null)
: (prev?.notifiedAt ?? null),
recovery: updates.recovery ?? prev?.recovery,
recoveryAttempts,
retryCount,
maxRetries:
updates.maxRetries ??
prev?.maxRetries ??
DEFAULT_UNIT_RUNTIME_MAX_RETRIES,
lastRecoveryReason: updates.lastRecoveryReason ?? prev?.lastRecoveryReason,
runawayGuardPause: updates.runawayGuardPause ?? prev?.runawayGuardPause,
};
writeFileSync(path, JSON.stringify(next, null, 2) + "\n", "utf-8");
_runtimeCache.set(path, next);
return next;
}
export function readUnitRuntimeRecord(basePath, unitType, unitId) {
const path = runtimePath(basePath, unitType, unitId);
const cached = _runtimeCache.get(path);
if (cached !== undefined) return cached;
const record = readUnitRuntimeRecordFromDisk(path);
if (record !== null) _runtimeCache.set(path, record);
return record;
}
export function clearUnitRuntimeRecord(basePath, unitType, unitId) {
const path = runtimePath(basePath, unitType, unitId);
_runtimeCache.delete(path);
if (existsSync(path)) unlinkSync(path);
}
/**
* Return all runtime records currently on disk for `basePath`.
* Returns an empty array if the runtime directory does not exist.
*/
export function listUnitRuntimeRecords(basePath) {
const dir = runtimeDir(basePath);
if (!existsSync(dir)) return [];
const results = [];
for (const file of readdirSync(dir)) {
if (!file.endsWith(".json")) continue;
try {
const raw = readFileSync(join(dir, file), "utf-8");
const record = JSON.parse(raw);
results.push(record);
} catch {
// Skip malformed files
}
}
return results;
}
export async function inspectExecuteTaskDurability(basePath, unitId) {
const { milestone: mid, slice: sid, task: tid } = parseUnitId(unitId);
if (!mid || !sid || !tid) return null;
const planAbs = resolveSliceFile(basePath, mid, sid, "PLAN");
const summaryAbs = resolveTaskFile(basePath, mid, sid, tid, "SUMMARY");
const stateAbs = join(sfRoot(basePath), "STATE.md");
const planPath = relSliceFile(basePath, mid, sid, "PLAN");
const summaryPath = relTaskFile(basePath, mid, sid, tid, "SUMMARY");
const planContent = planAbs ? await loadFile(planAbs) : null;
const stateContent = existsSync(stateAbs)
? readFileSync(stateAbs, "utf-8")
: "";
const summaryExists = !!(summaryAbs && existsSync(summaryAbs));
const escapedTid = tid.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
const taskChecked =
!!planContent &&
new RegExp(`^- \\[[xX]\\] \\*\\*${escapedTid}:`, "m").test(planContent);
const nextActionAdvanced = !new RegExp(`Execute ${tid}\\b`).test(
stateContent,
);
// Must-have coverage: load task plan and count mentions in summary
let mustHaveCount = 0;
let mustHavesMentionedInSummary = 0;
const taskPlanAbs = resolveTaskFile(basePath, mid, sid, tid, "PLAN");
if (taskPlanAbs) {
const taskPlanContent = await loadFile(taskPlanAbs);
if (taskPlanContent) {
const mustHaves = parseTaskPlanMustHaves(taskPlanContent);
mustHaveCount = mustHaves.length;
if (mustHaveCount > 0 && summaryExists && summaryAbs) {
const summaryContent = await loadFile(summaryAbs);
if (summaryContent) {
mustHavesMentionedInSummary = countMustHavesMentionedInSummary(
mustHaves,
summaryContent,
);
}
}
}
}
return {
planPath,
summaryPath,
summaryExists,
taskChecked,
nextActionAdvanced,
mustHaveCount,
mustHavesMentionedInSummary,
};
}
export function formatExecuteTaskRecoveryStatus(status) {
const missing = [];
if (!status.summaryExists)
missing.push(`summary missing (${status.summaryPath})`);
if (!status.taskChecked)
missing.push(`task checkbox unchecked in ${status.planPath}`);
if (!status.nextActionAdvanced)
missing.push("state next action still points at the timed-out task");
if (
status.mustHaveCount > 0 &&
status.mustHavesMentionedInSummary < status.mustHaveCount
) {
missing.push(
`must-have gap: ${status.mustHavesMentionedInSummary} of ${status.mustHaveCount} must-haves addressed in summary`,
);
}
return missing.length > 0
? missing.join("; ")
: "all durable task artifacts present";
}
// ─── Stale slice runtime record reconciliation ──────────────────────────────
/**
* Clear unit runtime records for complete-slice units that are in a terminal
* non-completed state (cancelled, failed, stale) but whose slice is actually
* complete in the DB and has a valid SUMMARY.md.
*
* Purpose: prevent the pi runtime flow-audit from emitting false-positive
* stale-dispatch warnings for slices that completed successfully on retry.
* The flow-audit reads journal/runtime state but does not check for later
* successful retries or existing artifact files (#sf-moqv5o7h-vaabu6).
*
* Consumer: bootstrapAutoSession in auto-start.ts, called after
* cleanStaleRuntimeUnits.
*/
export function reconcileStaleCompleteSliceRecords(basePath) {
const dir = runtimeDir(basePath);
if (!existsSync(dir)) return { cleared: 0, details: [] };
let cleared = 0;
const details = [];
for (const file of readdirSync(dir)) {
if (!file.endsWith(".json")) continue;
let record;
try {
record = JSON.parse(readFileSync(join(dir, file), "utf-8"));
} catch {
continue;
}
if (record.unitType !== "complete-slice") continue;
const state = getUnitRuntimeState(record);
// Only target terminal non-completed states that could trigger
// flow-audit warnings.
if (
!["cancelled", "failed", "stale", "runaway-recovered"].includes(
state.status,
)
)
continue;
const { milestone: mid, slice: sid } = parseUnitId(record.unitId);
if (!mid || !sid) continue;
// DB check: slice status must be "complete"
let dbComplete = false;
if (isDbAvailable()) {
try {
const sliceRow = getSlice(mid, sid);
dbComplete = sliceRow?.status === "complete";
} catch {
// DB read failure — skip this record rather than risk data loss
continue;
}
}
if (!dbComplete) continue;
// Artifact check: SUMMARY.md must exist with a valid completed_at
const summaryPath = resolveSliceFile(basePath, mid, sid, "SUMMARY");
let artifactValid = false;
if (summaryPath && existsSync(summaryPath)) {
try {
const content = readFileSync(summaryPath, "utf-8");
const summary = parseSummary(content);
artifactValid = !!summary.frontmatter.completed_at;
} catch {
artifactValid = false;
}
}
if (!artifactValid) continue;
// All checks pass — clear the stale runtime record
try {
unlinkSync(join(dir, file));
_runtimeCache.delete(join(dir, file));
cleared++;
details.push(`${record.unitId} (was ${state.status})`);
} catch (_err) {
// Non-fatal — record stays, but at least we tried
}
}
return { cleared, details };
}
/**
* Clear runtime records whose durable artifacts already prove completion.
*
* Purpose: recover from crashes, process timeouts, or hard exits that happen
* after a unit wrote its durable completion artifacts but before the in-memory
* finalizer cleared `.sf/runtime/units/*.json`.
*
* Consumer: auto-mode bootstrap before dispatching the next autonomous unit.
*/
export async function reconcileDurableCompleteUnitRuntimeRecords(basePath) {
const dir = runtimeDir(basePath);
if (!existsSync(dir)) return { cleared: 0, details: [] };
let cleared = 0;
const details = [];
for (const file of readdirSync(dir)) {
if (!file.endsWith(".json")) continue;
const abs = join(dir, file);
let record;
try {
record = JSON.parse(readFileSync(abs, "utf-8"));
} catch {
continue;
}
if (!record.unitType || !record.unitId) continue;
let durableComplete = false;
if (record.unitType === "execute-task") {
const status = await inspectExecuteTaskDurability(
basePath,
record.unitId,
);
durableComplete = !!(
status?.summaryExists &&
status.taskChecked &&
status.nextActionAdvanced
);
} else if (record.unitType === "complete-slice") {
const { milestone: mid, slice: sid } = parseUnitId(record.unitId);
if (mid && sid) {
let dbComplete = false;
if (isDbAvailable()) {
try {
const sliceRow = getSlice(mid, sid);
dbComplete = sliceRow?.status === "complete";
} catch {
dbComplete = false;
}
}
const summaryPath = resolveSliceFile(basePath, mid, sid, "SUMMARY");
let artifactValid = false;
if (summaryPath && existsSync(summaryPath)) {
try {
const content = readFileSync(summaryPath, "utf-8");
const summary = parseSummary(content);
artifactValid = !!summary.frontmatter.completed_at;
} catch {
artifactValid = false;
}
}
durableComplete = dbComplete && artifactValid;
}
}
if (!durableComplete) continue;
try {
unlinkSync(abs);
_runtimeCache.delete(abs);
cleared++;
const state = getUnitRuntimeState(record);
details.push(`${record.unitType} ${record.unitId} (was ${state.status})`);
} catch {
// Non-fatal — leave the record for the next bootstrap/doctor pass.
}
}
return { cleared, details };
}

View file

@ -7,7 +7,7 @@ import {
reconcileDurableCompleteUnitRuntimeRecords,
reconcileStaleCompleteSliceRecords,
writeUnitRuntimeRecord,
} from "../resources/extensions/sf/unit-runtime.js";
} from "../resources/extensions/sf/uok/unit-runtime.js";
describe("reconcileStaleCompleteSliceRecords", () => {
let basePath: string;