From 7318af029a08a9fe2341b0f17ac3039e5dba90f1 Mon Sep 17 00:00:00 2001 From: Mikael Hugo Date: Fri, 8 May 2026 18:18:47 +0200 Subject: [PATCH] sf snapshot: uncommitted changes after 33m inactivity --- docs/plans/A2A_ADOPTION_PLAN.md | 1100 ++++++++++++++++++++++++++++ docs/plans/MISSION-A2A-ADOPTION.md | 124 ++++ flake.nix | 1 - 3 files changed, 1224 insertions(+), 1 deletion(-) create mode 100644 docs/plans/A2A_ADOPTION_PLAN.md create mode 100644 docs/plans/MISSION-A2A-ADOPTION.md diff --git a/docs/plans/A2A_ADOPTION_PLAN.md b/docs/plans/A2A_ADOPTION_PLAN.md new file mode 100644 index 000000000..5e9352c37 --- /dev/null +++ b/docs/plans/A2A_ADOPTION_PLAN.md @@ -0,0 +1,1100 @@ +# A2A Adoption Plan for Singularity-Forge — Production Grade + +**Author:** Research synthesis +**Date:** 2026-05-08 +**Status:** Draft — for review +**Scope:** A2A as the internal agent communication protocol for SF dispatch layer + +--- + +## Executive Summary + +SF's 5 dispatch mechanisms + MessageBus are functionally complete but architecturally silos. A2A provides a standardized protocol that maps 1:1 onto SF's semantics. The existing MessageBus is preserved as the transport; A2A is the semantic layer on top. + +**This is a production-grade plan.** Every section covers: error handling, failure modes, rollback procedures, observability, and testing strategy. + +--- + +## Quick Reference + +| Concern | Decision | +|---|---| +| A2A as internal protocol | YES — standardizes Task state, priority, capability discovery | +| MessageBus | Wrap as `A2AMessageService` transport; add `AgentRegistry` | +| Transport | SQLite-backed MessageBus (not HTTP/WebSocket) for local process agents | +| External A2A | Optional; wired later when HTTP exposure is needed | +| Migration | 6 phases; each phase is independently deployable and rollback-safe | +| Feature flag | `SF_A2A_ENABLED` — gates all new A2A behavior; default OFF until Phase 6 | + +--- + +## 1. Architecture Overview + +### 1.1 System Diagram + +``` +┌──────────────────────────────────────────────────────────────────────┐ +│ Coordinator (UOK Kernel or subagent tool) │ +│ ┌────────────────────────────────────────────────────────────┐ │ +│ │ DispatchService │ │ +│ │ ├── A2AClient (send/receive) │ │ +│ │ ├── AgentRegistry (capability lookup) │ │ +│ │ └── AgentCard (self-description) │ │ +│ └────────────────────────────────────────────────────────────┘ │ +└───────────────────────────┬──────────────────────────────────────────┘ + │ A2AMessageService (wraps MessageBus) + │ bus.send(), bus.broadcast(), bus.sendOnce() + ▼ +┌──────────────────────────────────────────────────────────────────────┐ +│ MessageBus (SQLite-backed, existing) │ +│ ├── Durable at-least-once delivery │ +│ ├── TTL-based auto-compaction │ +│ ├── AgentInbox per agent (per-queue) │ +│ └── sendOnce for idempotent delivery │ +└───────────────────────────┬──────────────────────────────────────────┘ + │ + ▼ +┌──────────────────────────────────────────────────────────────────────┐ +│ Worker Agents (git worktrees, one per milestone/slice) │ +│ ├── AgentCard (role: worker, isolation: full) │ +│ ├── AgentInbox subscription │ +│ ├── Project SQLite WAL (read/write) │ +│ └── Emits: task_updated, cost, heartbeat │ +└──────────────────────────────────────────────────────────────────────┘ + +┌──────────────────────────────────────────────────────────────────────┐ +│ Constrained Subagents (no project DB) │ +│ ├── AgentCard (role: subagent, isolation: constrained) │ +│ ├── Limited tool scope (4 tools) │ +│ ├── AgentInbox (optional, opt-in via useMessageBus) │ +│ └── Returns structured output via A2A message │ +└──────────────────────────────────────────────────────────────────────┘ +``` + +### 1.2 A2A Semantic Mapping + +| SF Concept | A2A Concept | +|---|---| +| milestone / slice / task | A2A Task (`id`, `status`, `metadata`) | +| UOK Kernel | A2A Client + Coordinator Agent | +| Worker (parallel orchestrator) | A2A Agent | +| MessageBus.send() | A2A MessageService.send() | +| MessageBus.sendOnce() | A2A idempotent delivery | +| MessageBus.broadcast() | A2A MessageService.broadcast() | +| AgentInbox per worker | A2A per-agent subscription queue | +| File-based status files | A2A AgentStatus (online/busy/idle/offline/error) | +| adversarial-partner/combatant/architect | A2A Agent with specialized capabilities | +| parallel / debate / chain modes | A2A CommunicationPattern | + +--- + +## 2. A2A Type System + +### 2.1 Core Types + +```typescript +// File: src/resources/extensions/sf/dispatch/a2a-types.ts + +import type { + AgentCard, + AgentCapabilities, + Task, + TaskStatus, + Message, +} from "@a2a-js/sdk"; + +/** + * A2A Task state — maps directly from SF unit runtime status. + * These are the ONLY authoritative task states. + */ +export const A2A_TASK_STATES = [ + "submitted", + "working", + "completed", + "failed", + "cancelled", +] as const; +export type A2ATaskState = (typeof A2A_TASK_STATES)[number]; + +/** + * SF-specific task extensions — runtime states that A2A doesn't model. + * These live in task.metadata.sf_state and are NOT authoritative. + * DB is the authority for these. + */ +export const SF_TASK_EXTENSIONS = [ + "verifying", + "reviewing", + "blocked", + "paused", + "retrying", + "pending_input", +] as const; +export type SFTaskExtension = (typeof SF_TASK_EXTENSIONS)[number]; + +/** + * Message priority levels — determines delivery urgency and retry budget. + */ +export const MESSAGE_PRIORITIES = ["low", "normal", "high", "urgent"] as const; +export type MessagePriority = (typeof MESSAGE_PRIORITIES)[number]; + +/** + * Dispatch mode → A2A CommunicationPattern mapping. + */ +export const DISPATCH_TO_PATTERN: Record = { + single: "request_response", + parallel: "notification", + debate: "streaming", + chain: "request_response", +}; + +/** + * SF-specific capability extensions on top of A2A AgentCapabilities. + */ +export interface SFAgentCapabilities extends AgentCapabilities { + /** Domain role */ + role: "coordinator" | "worker" | "subagent" | "reviewer" | "adversary" | "architect" | "researcher"; + /** Isolation level — determines DB access */ + isolation: "full" | "constrained"; + /** For constrained agents — which tools are permitted */ + toolScope?: Array<"file_read" | "file_write" | "execute" | "query" | "memory_read" | "memory_write">; + /** Model tier for cost and routing decisions */ + modelTier: "primary" | "validation" | "worker"; + /** Domain specializations */ + specializations?: Array< + | "milestone_planning" + | "slice_planning" + | "code_review" + | "security_review" + | "adversarial_review" + | "architecture_analysis" + | "research" + | "verification" + >; +} + +/** + * SF AgentCard — extends A2A AgentCard with SF-specific capabilities. + * Published by each agent on startup; cached in AgentRegistry. + */ +export interface SFAgentCard extends AgentCard { + capabilities: SFAgentCapabilities; + metadata?: { + basePath?: string; + milestoneId?: string; + sliceId?: string; + worktreePath?: string; + pid?: number; + startedAt?: string; + }; +} + +/** + * SF Task metadata — stored in A2A Task.metadata. + * sf_state is NOT authoritative — DB is the authority. + */ +export interface SFTaskMetadata { + scope: "milestone" | "slice" | "task" | "inline"; + milestoneId: string; + sliceId?: string; + taskId?: string; + title: string; + /** Non-authoritative runtime hint — DB is authority */ + sf_state?: SFTaskExtension; + /** Base path for DB access */ + basePath: string; +} + +/** + * A2A Message envelope used internally. + * Wraps MessageBus messages with A2A metadata. + */ +export interface SFA2AMessage { + id: string; + type: "message" | "task_submitted" | "task_updated" | "task_completed" | "control" | "error"; + from: string; + to: string | string[]; + body: Record; + priority: MessagePriority; + sentAt: string; + deliveredAt?: string; + correlationId?: string; + conversationId?: string; + ttlMs?: number; + taskId?: string; + metadata?: Record; +} +``` + +--- + +## 3. Error Handling + +### 3.1 Message Delivery Errors + +| Error | Detection | Response | +|---|---|---| +| Recipient offline | `AgentRegistry.getStatus() === "offline"` | Buffer message; deliver on reconnect | +| Inbox full (max 1000) | `AgentInbox.unreadCount >= maxInboxSize` | Reject with `TOO_MANY_PENDING`; caller retries with backoff | +| TTL exceeded | `Date.now() - sentAt > ttlMs` | Discard; caller notified via error response | +| DB write conflict | SQLite `SQLITE_BUSY` | Retry with exponential backoff (max 3 attempts, 100ms base) | +| Invalid recipient | `AgentRegistry.getCard(to) === undefined` | Return `AGENT_NOT_FOUND` error; do not retry | + +### 3.2 Retry Strategy + +```typescript +// File: src/resources/extensions/sf/dispatch/a2a-service.ts + +const RETRY_CONFIG = { + maxAttempts: 3, + baseDelayMs: 100, + maxDelayMs: 5000, + backoffMultiplier: 2.0, + jitterFactor: 0.1, // 10% random jitter to prevent thundering herd +} as const; + +export class DeliveryError extends Error { + constructor( + message: string, + public readonly code: string, + public readonly retryable: boolean, + public readonly attempts: number, + ) { + super(message); + this.name = "DeliveryError"; + } +} + +async function sendWithRetry( + params: SendParams, + attempt = 1, +): Promise { + const { from, to, body, metadata = {} } = params; + + try { + return await doSend(from, to, body, metadata); + } catch (err) { + const isRetryable = + err instanceof DeliveryError && err.retryable && attempt < RETRY_CONFIG.maxAttempts; + + if (!isRetryable) { + throw err; + } + + const delay = Math.min( + RETRY_CONFIG.baseDelayMs * Math.pow(RETRY_CONFIG.backoffMultiplier, attempt - 1), + RETRY_CONFIG.maxDelayMs, + ); + const jitter = delay * RETRY_CONFIG.jitterFactor * Math.random(); + await sleep(delay + jitter); + + return sendWithRetry(params, attempt + 1); + } +} +``` + +### 3.3 Agent Crash Handling + +``` +Worker crash detection: + 1. Worker process exits → SIGCHLD handler + 2. Update AgentRegistry status: "offline" + 3. MessageBus retains undelivered messages (TTL not expired) + 4. Coordinator polls AgentRegistry.getStatus() every 30s + 5. On reconnect: worker re-registers AgentCard + 6. Buffered messages delivered to reconnected AgentInbox + 7. Coordinator re-sends any unacknowledged task_updated messages +``` + +### 3.4 Panic Mode + +When `messageService` fails to deliver HIGH/URGENT messages 3 times consecutively: + +1. Log `A2A_DELIVERY_PANIC` event to `.sf/journal/` +2. Fall back to file-based signal (`session-status-io.js`) +3. Emit `sf_dispatch_degraded` event +4. Dashboard shows "dispatch degraded" warning +5. Auto-recovery when MessageBus recovers + +--- + +## 4. Backpressure and Flow Control + +### 4.1 Per-Agent Inbox Backpressure + +```typescript +// File: src/resources/extensions/sf/dispatch/a2a-service.ts + +const INBOX_CONFIG = { + maxInboxSize: 1000, // Per-agent queue limit + maxMessageSizeBytes: 64 * 1024, // 64 KB per message body + highWaterMark: 800, // Warn when inbox reaches 80% + overflowAction: "reject", // "reject" | "drop_oldest" +} as const; + +interface SendParams { + from: string; + to: string; + body: Record; + metadata?: { + priority?: MessagePriority; + ttlMs?: number; + replyTo?: string; + taskId?: string; + }; +} + +function validateSend(params: SendParams): void { + const bodySize = JSON.stringify(params.body).length; + if (bodySize > INBOX_CONFIG.maxMessageSizeBytes) { + throw new DeliveryError( + `Message body ${bodySize} bytes exceeds limit ${INBOX_CONFIG.maxMessageSizeBytes}`, + "MESSAGE_TOO_LARGE", + false, // Not retryable + 0, + ); + } + + const inbox = bus.getInbox(params.to); + if (inbox.unreadCount >= INBOX_CONFIG.maxInboxSize) { + throw new DeliveryError( + `Inbox for ${params.to} is full (${inbox.unreadCount}/${INBOX_CONFIG.maxInboxSize})`, + "INBOX_OVERFLOW", + true, // Retryable after inbox drains + 0, + ); + } + + if (inbox.unreadCount >= INBOX_CONFIG.highWaterMark) { + logWarning("dispatch", `Inbox for ${params.to} at ${inbox.unreadCount}/${INBOX_CONFIG.maxInboxSize}`); + } +} +``` + +### 4.2 Coordinator Outbox Backpressure + +When the coordinator sends faster than workers can consume: + +```typescript +// Coordinator: batch outgoing messages, flush on interval +const outbox = new Map(); +const FLUSH_INTERVAL_MS = 500; + +setInterval(() => { + for (const [to, messages] of outbox) { + if (messages.length === 0) continue; + bus.broadcast(coordinatorId, [to], { batch: messages }); + messages.length = 0; // drain + } +}, FLUSH_INTERVAL_MS); + +// Caller adds to outbox instead of sending immediately +function scheduleSend(params: SendParams): void { + const queue = outbox.get(params.to) ?? []; + queue.push(wrapAsA2AMessage(params)); + outbox.set(params.to, queue); +} +``` + +### 4.3 Memory Budget Per Worker + +Each worker has a memory budget for buffering messages it cannot process immediately: + +``` +MAX_BUFFERED_MESSAGES_PER_WORKER = 100 +MAX_BUFFERED_BYTES_PER_WORKER = 10 * 1024 * 1024 // 10 MB +``` + +If a worker's inbox exceeds either limit, the oldest messages are dropped (not rejected — the sender already moved on). + +--- + +## 5. Observability + +### 5.1 Metrics + +```typescript +// File: src/resources/extensions/sf/dispatch/metrics.ts + +export const A2A_METRICS = { + // Message throughput + "sf_a2a_messages_sent_total": { + type: "counter", + help: "Total A2A messages sent", + labels: ["priority", "from_role", "to_role"], + }, + "sf_a2a_messages_delivered_total": { + type: "counter", + help: "Total A2A messages delivered to recipient inbox", + labels: ["priority", "from_role", "to_role"], + }, + "sf_a2a_messages_failed_total": { + type: "counter", + help: "Total A2A message delivery failures", + labels: ["priority", "error_code"], + }, + "sf_a2a_message_delivery_latency_ms": { + type: "histogram", + help: "End-to-end message delivery latency (send to inbox receipt)", + buckets: [10, 50, 100, 500, 1000, 5000], + }, + "sf_a2a_inbox_size": { + type: "gauge", + help: "Current inbox size per agent", + labels: ["agent_id", "role"], + }, + "sf_a2a_retry_total": { + type: "counter", + help: "Total retry attempts", + labels: ["priority", "attempt_number"], + }, + "sf_a2a_agent_status": { + type: "gauge", + help: "Agent status (1=online, 0.5=busy, 0.1=idle, 0=offline/error)", + labels: ["agent_id", "role"], + }, +} as const; +``` + +### 5.2 Structured Logging + +Every A2A operation emits structured log lines: + +```typescript +// File: src/resources/extensions/sf/dispatch/logger.ts + +type A2ALogEvent = + | { event: "a2a.send"; from: string; to: string; priority: string; messageId: string; sizeBytes: number } + | { event: "a2a.delivered"; messageId: string; to: string; latencyMs: number } + | { event: "a2a.delivery_failed"; messageId: string; error: string; retryable: boolean; attempt: number } + | { event: "a2a.agent_registered"; agentId: string; role: string; capabilities: string[] } + | { event: "a2a.agent_offline"; agentId: string; reason: string } + | { event: "a2a.inbox_overflow"; agentId: string; size: number; action: string } + | { event: "a2a.panic_mode"; reason: string; fallback_used: boolean }; + +function logA2A(event: A2ALogEvent): void { + const line = JSON.stringify({ + ts: new Date().toISOString(), + ...event, + }); + workflowLogger.log("dispatch", line); +} +``` + +### 5.3 Trace Context + +Propagate trace context through A2A messages for debugging: + +```typescript +interface TraceContext { + traceId: string; // ULID — unique per dispatch session + spanId: string; // Per-message ID + parentSpanId?: string; +} + +function injectTraceContext(msg: SFA2AMessage): SFA2AMessage { + const spanId = ulid(); + return { + ...msg, + metadata: { + ...msg.metadata, + trace: { + traceId: currentTraceId(), + spanId, + parentSpanId: currentSpanId(), + }, + }, + }; +} +``` + +Traces are stored in `.sf/journal/a2a-traces/{date}.jsonl` and queryable via `sf trace `. + +--- + +## 6. Security + +### 6.1 Agent Authentication + +Every A2A message must carry a valid agent identity. Identity is established at agent startup: + +```typescript +// File: src/resources/extensions/sf/dispatch/auth.ts + +/** + * Agent identity token — HMAC-SHA256 of agent ID + basePath + startup timestamp. + * Used to authenticate messages from agents. + * Generated once at agent startup; stored in process.env.SF_AGENT_TOKEN. + */ +function generateAgentToken(agentId: string, basePath: string): string { + const secret = process.env.SF_A2A_SHARED_SECRET ?? process.env.SF_DB_KEY ?? "sf-insecure-dev-secret"; + const payload = `${agentId}:${basePath}:${Date.now()}`; + return createHmac("sha256", secret).update(payload).digest("hex").slice(0, 32); +} + +function verifyAgentToken(token: string, agentId: string): boolean { + // Tokens are single-use (generated per startup, not reusable) + // Verification is membership check: token must have been issued for this agentId + return validTokens.has(`${agentId}:${token}`); +} +``` + +### 6.2 Input Validation + +```typescript +// File: src/resources/extensions/sf/dispatch/validation.ts + +const MAX_BODY_DEPTH = 20; // Nested object depth +const MAX_ARRAY_LENGTH = 1000; // Max array items in body +const MAX_STRING_LENGTH = 100_000; // Max string value length +const FORBIDDEN_KEYS = ["__proto__", "constructor", "prototype"]; // Prototype pollution + +function validateMessageBody(body: unknown, depth = 0): void { + if (depth > MAX_BODY_DEPTH) throw new ValidationError("BODY_TOO_DEEP"); + if (Array.isArray(body)) { + if (body.length > MAX_ARRAY_LENGTH) throw new ValidationError("ARRAY_TOO_LARGE"); + for (const item of body) validateMessageBody(item, depth + 1); + return; + } + if (typeof body === "object" && body !== null) { + for (const [k, v] of Object.entries(body)) { + if (FORBIDDEN_KEYS.includes(k)) throw new ValidationError(`FORBIDDEN_KEY: ${k}`); + validateMessageBody(v, depth + 1); + } + return; + } + if (typeof body === "string" && body.length > MAX_STRING_LENGTH) { + throw new ValidationError("STRING_TOO_LONG"); + } +} +``` + +### 6.3 Capability Enforcement + +The `AgentRegistry` enforces that agents only perform actions consistent with their registered capabilities: + +```typescript +// File: src/resources/extensions/sf/dispatch/capability-enforcer.ts + +function enforceCapabilities(agentId: string, action: string): void { + const card = registry.getCard(agentId); + if (!card) throw new DeliveryError(`Unknown agent: ${agentId}`, "AGENT_NOT_FOUND", false, 0); + + const caps = card.capabilities as SFAgentCapabilities; + + switch (action) { + case "write_project_db": + if (caps.isolation !== "full") { + throw new DeliveryError( + `${agentId} cannot write project DB (isolation: ${caps.isolation})`, + "ISOLATION_VIOLATION", + false, + 0, + ); + } + break; + case "send_to_worker": + if (caps.role === "subagent") { + // Constrained subagents can only send to their parent + throw new DeliveryError("Subagent cannot send to workers", "CAPABILITY_DENIED", false, 0); + } + break; + case "read_project_context": + // All agents can read project context (it's in the prompt) + break; + } +} +``` + +--- + +## 7. Testing Strategy + +### 7.1 Unit Tests + +```typescript +// File: src/resources/extensions/sf/dispatch/a2a-service.test.ts + +describe("A2AMessageService", () => { + let bus: MessageBus; + let registry: AgentRegistry; + let service: A2AMessageService; + + beforeEach(() => { + bus = new MessageBus(tmpDir()); + registry = new AgentRegistry(tmpDir(), bus); + service = new A2AMessageService(tmpDir(), registry); + }); + + test("send_delivers_to_recipient_inbox", async () => { + registry.register(workerCard("worker:1")); + const id = service.send({ + from: "coordinator", + to: "worker:1", + body: { type: "task_submitted", taskId: "M01" }, + }); + const inbox = service.getInbox("worker:1"); + const msgs = inbox.list(); + expect(msgs).toHaveLength(1); + expect(msgs[0].id).toBe(id); + }); + + test("sendWithRetry_retries_on_retryable_error", async () => { + // Simulate transient DB busy + vi.spyOn(bus, "send").mockRejectedOnceOnce(new DeliveryError("busy", "DB_BUSY", true, 1)); + vi.spyOn(bus, "send").mockResolvedValueOnce("msg-1"); + + const id = await sendWithRetry({ from: "c", to: "w", body: { test: true } }); + expect(id).toBe("msg-1"); + expect(bus.send).toHaveBeenCalledTimes(2); + }); + + test("sendWithRetry_does_not_retry_non_retryable_error", async () => { + vi.spyOn(bus, "send").mockRejectedValueOnce( + new DeliveryError("unknown agent", "AGENT_NOT_FOUND", false, 1), + ); + await expect(sendWithRetry({ from: "c", to: "w", body: { test: true } })) + .rejects.toThrow("AGENT_NOT_FOUND"); + expect(bus.send).toHaveBeenCalledTimes(1); + }); + + test("sendOnce_same_key_returns_same_id", async () => { + const id1 = service.sendOnce({ from: "c", to: "w", body: { beat: 1 }, dedupeKey: "heartbeat" }); + const id2 = service.sendOnce({ from: "c", to: "w", body: { beat: 2 }, dedupeKey: "heartbeat" }); + expect(id1).toBe(id2); // Idempotent + }); + + test("validateMessageBody_rejects_deep_objects", () => { + const deep = { a: { b: { c: { d: { e: {} } } } }; + expect(() => validateMessageBody(deep, 0, MAX_BODY_DEPTH)).toThrow("BODY_TOO_DEEP"); + }); + + test("validateMessageBody_rejects_prototype_pollution", () => { + expect(() => validateMessageBody({ "__proto__": { evil: true } }, 0)) + .toThrow("FORBIDDEN_KEY"); + }); +}); +``` + +### 7.2 Integration Tests + +```typescript +// File: src/resources/extensions/sf/tests/a2a-integration.test.ts + +describe("A2A Integration", () => { + test("worker_registers_and_receives_task", async () => { + const { coordinator, worker, service } = setupTwoAgentSystem(); + + // Worker starts, registers + await worker.start(); + await waitFor(() => registry.getStatus("worker:1") === "online"); + + // Coordinator sends task + service.send({ + from: "coordinator", + to: "worker:1", + body: { type: "task_submitted", taskId: "M01" }, + }); + + // Worker receives + const msg = await worker.waitForMessage("task_submitted"); + expect(msg.body.taskId).toBe("M01"); + }); + + test("worker_crash_does_not_lose_messages", async () => { + const { coordinator, worker, service } = setupTwoAgentSystem(); + await worker.start(); + + service.send({ from: "coordinator", to: "worker:1", body: { type: "task_submitted" } }); + + // Worker crashes and restarts + await worker.kill(); + await worker.start(); + + // Message should still be in inbox after restart + const msg = await worker.waitForMessage("task_submitted"); + expect(msg).toBeDefined(); + }); + + test("coordinator_receives_worker_heartbeat", async () => { + const { coordinator, worker, service } = setupTwoAgentSystem(); + await worker.start(); + + worker.sendHeartbeat(); + + const msg = await coordinator.waitForMessage("worker.heartbeat"); + expect(msg.from).toBe("worker:1"); + }); +}); +``` + +### 7.3 Chaos Tests + +```typescript +// File: src/resources/extensions/sf/tests/a2a-chaos.test.ts + +describe("A2A Chaos", () => { + test("messages_delivered_despite_slow_worker", async () => { + // Worker is slow to process (simulate 10s processing time) + worker.simulateSlowProcessing(10_000); + + // Send 100 messages while worker is slow + const sends = Array.from({ length: 100 }, (_, i) => + service.send({ from: "c", to: "w", body: { seq: i } }), + ); + const results = await Promise.allSettled(sends); + + // All succeed (buffered, not rejected) + expect(results.filter(r => r.status === "fulfilled")).toHaveLength(100); + + // Worker processes all after recovery + worker.simulateFastProcessing(); + await worker.processAllBuffered(); + + const received = await worker.getAllMessages(); + expect(received).toHaveLength(100); + }); + + test("panic_mode_activates_on_repeated_failure", async () => { + bus.simulatePermanentFailure(); + + for (let i = 0; i < 3; i++) { + try { + await service.send({ from: "c", to: "w", body: { test: true } }); + } catch {} + } + + // Panic mode should be active + expect(service.isPanicMode).toBe(true); + // File-based fallback should be active + expect(sessionStatusSignalWasUsed()).toBe(true); + }); +}); +``` + +--- + +## 8. Rollback Procedures + +### 8.1 Feature Flag + +All A2A behavior is gated by `SF_A2A_ENABLED`: + +```typescript +// File: src/resources/extensions/sf/dispatch/service.ts + +const A2A_ENABLED = process.env.SF_A2A_ENABLED === "1"; + +export class DispatchService { + private messageService: A2AMessageService | null = null; + + constructor(opts: DispatchOptions) { + if (A2A_ENABLED) { + this.messageService = new A2AMessageService(opts.basePath, this.registry); + } + // ... + } + + async pause(workerId: string): Promise { + if (this.messageService && A2A_ENABLED) { + await this.messageService.send({ + from: "coordinator", + to: workerId, + body: { type: "control", action: "pause" }, + metadata: { priority: "high" }, + }); + } else { + // Legacy file-based signal + sendSignal(this.basePath, workerId, "pause"); + } + } +} +``` + +### 8.2 Per-Phase Rollback + +| Phase | Rollback | +|---|---| +| Phase 1: A2A adapter types | Delete `a2a-types.ts`, `a2a-task.ts`. No behavior change — code not wired yet. | +| Phase 2: AgentRegistry | Delete `capability-registry.ts`. Remove registry from `DispatchService` constructor. No behavior change. | +| Phase 3: MessageBus wiring | Set `SF_A2A_ENABLED=0`. File-based IPC (`sendSignal`) is the automatic fallback. | +| Phase 4: Subagent A2A | Delete `subagent/a2a.ts`. Restore original `subagent/index.js` from git. | +| Phase 5: UOK kernel A2A | Revert `uok/kernel.js` to pre-Phase-5 state from git. | +| Phase 6: Fallback removal | `session-status-io.js` is never removed — it stays as crash-recovery fallback permanently. | + +### 8.3 Emergency Rollback + +```bash +# Emergency: disable A2A entirely +SF_A2A_ENABLED=0 sf headless autonomous + +# Emergency: revert to specific phase +git stash +git checkout phase2-end # tag or branch at end of Phase 2 +SF_A2A_ENABLED=0 sf headless autonomous + +# Verify rollback +npx vitest run src/resources/extensions/sf/tests/uok-message-bus.test.mjs +``` + +--- + +## 9. Migration Phases (Detailed) + +### Phase 1: A2A Type Definitions (Week 1-2) +**Risk: Zero | Behavior: identical** + +``` +Files created: + dispatch/a2a-types.ts — A2A types + SF extensions + dispatch/a2a-task.ts — Task creation + state mapping + dispatch/a2a-errors.ts — DeliveryError + error codes + +Files modified: + None (types are additive, not wired) +``` + +**Verification:** +```bash +npx tsc --noEmit src/resources/extensions/sf/dispatch/a2a-types.ts +npx vitest run src/resources/extensions/sf/dispatch/a2a-task.test.ts +``` + +--- + +### Phase 2: AgentRegistry (Week 2-3) +**Risk: Low | Behavior: additive** + +``` +Files created: + dispatch/capability-registry.ts — AgentRegistry + SF_CAPABILITY_DEFINITIONS + +Files modified: + dispatch/service.ts — Add registry to DispatchService (opt-in via feature flag) + dispatch/index.ts — Export new types +``` + +**Verification:** +```bash +npx vitest run src/resources/extensions/sf/dispatch/capability-registry.test.ts +SF_A2A_ENABLED=0 npm run test:unit # existing tests pass +``` + +--- + +### Phase 3: MessageBus Wiring (Week 3-4) +**Risk: Medium | Behavior: pause/resume/stop now use MessageBus** + +``` +Files created: + dispatch/a2a-service.ts — A2AMessageService wrapping MessageBus + +Files modified: + dispatch/service.ts — Wire MessageBus into pause/resume/stop + dispatch/worker-*.ts — Register AgentCard on spawn + session-status-io.ts — Mark as crash-recovery fallback (never primary) +``` + +**Before:** `sendSignal(basePath, id, "pause")` → signal file +**After:** `messageService.send({ from, to, body: { type: "control", action: "pause" }, priority: HIGH })` +**Fallback:** File signal if MessageBus delivery fails 3 times + +**Verification:** +```bash +SF_A2A_ENABLED=1 npx vitest run src/resources/extensions/sf/tests/a2a-integration.test.ts +SF_A2A_ENABLED=0 npm run test:unit # existing tests pass +``` + +--- + +### Phase 4: Subagent A2A (Week 4-5) +**Risk: Medium | Behavior: subagent modes unchanged** + +``` +Files modified: + subagent/index.ts — Use DispatchService internally + dispatch/service.ts — Handle isolation: constrained +``` + +**Verification:** +```bash +SF_A2A_ENABLED=1 npx vitest run src/resources/extensions/sf/tests/subagent-a2a.test.ts +SF_A2A_ENABLED=0 npm run test:unit # existing tests pass +``` + +--- + +### Phase 5: UOK Kernel A2A (Week 5-6) +**Risk: Medium | Behavior: UOK autonomous loop uses A2A** + +``` +Files modified: + uok/kernel.ts — Use DispatchService + A2AMessageService + uok/index.ts — Export new A2A types +``` + +**Verification:** +```bash +SF_A2A_ENABLED=1 npm run test:integration # Full integration suite +SF_A2A_ENABLED=0 npm run test:integration # Legacy still works +``` + +--- + +### Phase 6: A2A Default On (Week 6-7) +**Risk: Low | Behavior: A2A is now the default** + +``` +Actions: + 1. Set SF_A2A_ENABLED=1 as default in preferences + 2. Document in CHANGELOG.md + 3. Monitor for 1 week before declaring stable +``` + +--- + +## 10. Operational Runbooks + +### 10.1 Dispatch Degraded + +**Symptoms:** Dashboard shows "dispatch degraded"; `sf_dispatch_degraded` events in journal + +**Diagnosis:** +```bash +# Check MessageBus health +node -e "import('./src/resources/extensions/sf/uok/message-bus.js').then(m => { + const metrics = m.getUokMessageBusMetrics(); + console.log(JSON.stringify(metrics, null, 2)); +}') + +# Check for panic mode +cat .sf/journal/*.jsonl | jq 'select(.event == "a2a.panic_mode")' | tail -5 +``` + +**Fix:** +```bash +# Switch to file-based IPC temporarily +SF_A2A_ENABLED=0 sf headless autonomous + +# Restart with A2A off +sf headless autonomous + +# After fix: re-enable A2A +sf config set SF_A2A_ENABLED=1 +``` + +### 10.2 Worker Not Receiving Messages + +**Symptoms:** Worker shows "offline" but process is running + +**Diagnosis:** +```bash +# Check worker AgentCard registration +curl -s http://localhost:3030/api/dispatch/agents | jq '.[] | select(.role == "worker")' + +# Check worker inbox size +node -e "const m = require('./src/resources/extensions/sf/dispatch/metrics'); m.getInboxMetrics('worker:M01')" + +# Check MessageBus delivery latency +cat .sf/journal/*.jsonl | jq 'select(.event == "a2a.delivery_failed")' | tail -20 +``` + +**Fix:** +```bash +# Restart the worker process +sf parallel stop M01 +sf parallel start M01 + +# Or: send SIGUSR1 to worker to re-register its AgentCard +kill -USR1 $(pgrep -f "sf.*M01") +``` + +### 10.3 Inbox Overflow + +**Symptoms:** `"INBOX_OVERFLOW"` errors in logs; workers missing messages + +**Diagnosis:** +```bash +# Find overflowing inboxes +node -e "import('./src/resources/extensions/sf/dispatch/metrics').then(m => { + Object.entries(m.getAllInboxSizes()).forEach(([id, size]) => { + if (size > 900) console.log(id, size); + }); +})" +``` + +**Fix:** +```bash +# Compact all message buses (removes messages older than retention) +sf uok messages compact + +# Or: increase inbox size limit temporarily +SF_INBOX_MAX_SIZE=5000 sf headless autonomous +``` + +--- + +## 11. Performance Targets + +| Metric | Target | Critical Threshold | +|---|---|---| +| Message delivery latency (local) | < 50ms p50, < 500ms p99 | > 2000ms | +| Inbox delivery for 100 parallel workers | < 5s end-to-end | > 15s | +| Agent registration time | < 100ms | > 1000ms | +| Message throughput | > 1000 msg/s per coordinator | < 100 msg/s | +| Memory per worker (idle) | < 50 MB | > 200 MB | +| Memory per coordinator (10 workers) | < 200 MB | > 500 MB | +| DB WAL size growth | < 10 MB/day | > 100 MB/day | +| Recovery time after coordinator crash | < 5s | > 30s | + +--- + +## 12. File Manifest + +### New Files + +| File | Lines (est) | Purpose | +|---|---|---| +| `dispatch/a2a-types.ts` | 120 | Core A2A types + SF extensions | +| `dispatch/a2a-task.ts` | 80 | Task creation + state mapping | +| `dispatch/a2a-errors.ts` | 60 | DeliveryError + error codes | +| `dispatch/a2a-service.ts` | 250 | A2AMessageService wrapping MessageBus | +| `dispatch/capability-registry.ts` | 180 | AgentRegistry + SF_CAPABILITY_DEFINITIONS | +| `dispatch/metrics.ts` | 60 | A2A Prometheus metrics | +| `dispatch/logger.ts` | 40 | A2A structured logging | +| `dispatch/validation.ts` | 70 | Message body validation | +| `dispatch/auth.ts` | 50 | Agent token generation + verification | +| `dispatch/index.ts` | 30 | Barrel exports | +| `dispatch/a2a-service.test.ts` | 200 | Unit tests | +| `tests/a2a-integration.test.ts` | 300 | Integration tests | +| `tests/a2a-chaos.test.ts` | 150 | Chaos tests | +| **Total new** | **~1600 LOC** | | + +### Modified Files + +| File | Change | +|---|---| +| `dispatch/service.ts` | Add registry + messageService; wire pause/resume/stop | +| `dispatch/worker-orchestrator.ts` | Register AgentCard on spawn; open AgentInbox | +| `uok/kernel.ts` | Register coordinator AgentCard; use DispatchService | +| `uok/message-bus.js` | Add AgentCard types (no behavior change) | +| `uok/index.ts` | Export A2A types | +| `subagent/index.ts` | Use DispatchService; remove ~600 LOC spawn management | +| `session-status-io.ts` | Mark as crash-recovery fallback only | + +--- + +## Summary + +| Question | Answer | +|---|---| +| A2A as internal protocol | YES — Task state, priority, capability discovery | +| Transport | SQLite MessageBus (not HTTP/WebSocket) | +| External A2A | Optional; wired later | +| Feature flag | `SF_A2A_ENABLED` gates all behavior | +| Migration | 6 phases; each independently rollback-safe | +| Error handling | Retry with exponential backoff; panic mode with file-based fallback | +| Backpressure | Per-inbox limits; coordinator outbox batching | +| Observability | Prometheus metrics + structured JSONL logging | +| Security | Agent tokens, input validation, capability enforcement | +| Testing | Unit + integration + chaos tests for every phase | +| Rollback | `SF_A2A_ENABLED=0` disables all new behavior instantly | diff --git a/docs/plans/MISSION-A2A-ADOPTION.md b/docs/plans/MISSION-A2A-ADOPTION.md new file mode 100644 index 000000000..f87854d07 --- /dev/null +++ b/docs/plans/MISSION-A2A-ADOPTION.md @@ -0,0 +1,124 @@ +# Mission: Adopt A2A as Internal Agent Communication Protocol + +## Metadata +- **Created:** 2026-05-08 +- **Status:** Proposed (not started) +- **Source:** Research from 4+ model consultations + codebase analysis + cross-repo pattern study +- **Plans:** `docs/plans/A2A_ADOPTION_PLAN.md`, `docs/plans/UNIFIED_DISPATCH_V2.md`, `docs/plans/dispatch-orchestration-architecture.md` + +--- + +## Goal + +Consolidate SF's 5 dispatch mechanisms + MessageBus into a unified dispatch architecture using A2A as the semantic communication layer, with MessageBus as the transport. + +--- + +## Context + +### Problem +SF has accumulated 5 dispatch mechanisms without a unifying abstraction: +1. **subagent tool** — inline delegation, 4 tools, no project DB +2. **parallel-orchestrator** — milestone-level parallelism, full tools, shared SQLite WAL +3. **slice-parallel-orchestrator** — same pattern at slice scope (~80% duplicate) +4. **UOK kernel** — autonomous loop controller +5. **MessageBus** — SQLite-backed durable messaging (well-implemented but not wired) +6. **Cmux** — surface integration (keep separate) + +### Evidence +- `parallel-orchestrator.js` + `slice-parallel-orchestrator.js` share ~80% logic +- MessageBus has zero references in parallel-orchestrator (coordination via file-based IPC) +- `@a2a-js/sdk@0.3.11` already in node_modules (transitive dep of `@google/gemini-cli-core`) +- All 4 consulted models converged: merge the orchestrators first, wire MessageBus, keep subagent constraint + +### Constraints +- Must preserve existing behavior throughout migration (feature-flagged rollout) +- `SF_A2A_ENABLED=0` must disable all new A2A behavior instantly +- subagent isolation (4 tools, no project DB writes) is NOT changed +- File-based IPC (`session-status-io.js`) stays as permanent crash-recovery fallback +- DB is authoritative; A2A messages are hints + +--- + +## Mission Tasks + +### Phase 1 — A2A Type Definitions (Week 1-2) +- [ ] Create `dispatch/a2a-types.ts` — A2A types + SF extensions (A2ATaskState, SFTaskExtension, MessagePriority, SFAgentCapabilities, SFAgentCard) +- [ ] Create `dispatch/a2a-task.ts` — Task creation + state mapping functions +- [ ] Create `dispatch/a2a-errors.ts` — DeliveryError + error codes +- [ ] Run: `npx tsc --noEmit` on new types +- [ ] Run: `npx vitest run dispatch/a2a-task.test.ts` + +### Phase 2 — AgentRegistry (Week 2-3) +- [ ] Create `dispatch/capability-registry.ts` — AgentRegistry + SF_CAPABILITY_DEFINITIONS +- [ ] Create `dispatch/index.ts` — barrel exports +- [ ] Wire registry into `DispatchService` (opt-in via `SF_A2A_ENABLED=1`) +- [ ] Run: unit tests pass +- [ ] Run: existing tests pass with `SF_A2A_ENABLED=0` + +### Phase 3 — MessageBus Wiring (Week 3-4) +- [ ] Create `dispatch/a2a-service.ts` — A2AMessageService wrapping MessageBus +- [ ] Create `dispatch/metrics.ts` — A2A Prometheus metrics +- [ ] Create `dispatch/logger.ts` — structured JSONL logging +- [ ] Create `dispatch/validation.ts` — message body validation +- [ ] Create `dispatch/auth.ts` — agent token generation + verification +- [ ] Replace file-based IPC with MessageBus.send() for pause/resume/stop +- [ ] Keep file-based IPC as crash-recovery fallback +- [ ] Run: integration tests with `SF_A2A_ENABLED=1` +- [ ] Run: existing tests pass with `SF_A2A_ENABLED=0` + +### Phase 4 — Subagent A2A (Week 4-5) +- [ ] Refactor `subagent/index.ts` to use DispatchService +- [ ] Remove ~600 LOC spawn management (replaced by `dispatch.start()`) +- [ ] Verify all 4 subagent modes work identically +- [ ] Add optional MessageBus inbox for subagents (`useMessageBus: true`) +- [ ] Run: subagent tests with `SF_A2A_ENABLED=1` + +### Phase 5 — UOK Kernel A2A (Week 5-6) +- [ ] Register coordinator AgentCard in `uok/kernel.ts` +- [ ] Replace `startParallel`/`startSliceParallel` calls with DispatchService +- [ ] Verify `sf headless autonomous` works identically +- [ ] Run: integration tests with `SF_A2A_ENABLED=1` + +### Phase 6 — A2A Default On (Week 6-7) +- [ ] Set `SF_A2A_ENABLED=1` as default in preferences +- [ ] Monitor for 1 week +- [ ] Promote to stable + +--- + +## Verification Criteria + +1. `npx vitest run src/resources/extensions/sf/tests/uok-message-bus.test.mjs` passes throughout +2. `SF_A2A_ENABLED=0 npm run test:unit` passes throughout (legacy preserved) +3. Pause/resume works via MessageBus with `SF_A2A_ENABLED=1` +4. All 4 subagent modes work identically (parallel, debate, chain, single) +5. `sf headless autonomous` works with `SF_A2A_ENABLED=1` +6. Worker AgentCards visible in dashboard +7. Panic mode activates correctly (3 consecutive delivery failures → file-based fallback) +8. `sf uok messages compact` works +9. `sf dispatch agents list` shows registered agents with capabilities + +--- + +## Key Files + +| File | Role | +|---|---| +| `src/resources/extensions/sf/uok/message-bus.js` | Existing MessageBus (transport) | +| `src/resources/extensions/sf/dispatch/service.js` | DispatchService (A2A coordinator client) | +| `src/resources/extensions/sf/worktree-orchestrator.js` | Worker spawner (A2A agent) | +| `src/resources/extensions/sf/uok/kernel.js` | UOK kernel (A2A coordinator) | +| `src/resources/extensions/subagent/index.js` | Subagent tool (A2A constrained agent) | +| `docs/plans/A2A_ADOPTION_PLAN.md` | Full production-grade plan | + +--- + +## Exit Criteria + +Mission is complete when: +- All 6 phases are merged to main +- `SF_A2A_ENABLED=1` is the default +- `SF_A2A_ENABLED=0` still passes all tests (legacy preserved) +- 0 P0/P1 bugs in dispatch layer for 2 consecutive weeks +- A2A observability dashboard shows healthy metrics (delivery latency < 500ms p99) diff --git a/flake.nix b/flake.nix index 9605ec9d7..43682a12d 100644 --- a/flake.nix +++ b/flake.nix @@ -33,7 +33,6 @@ git just libsecret - nodePackages.jscpd pkg-config protobuf rust-analyzer