singularity-forge/src/resources/extensions/sf/uok/message-bus.js

240 lines
6.1 KiB
JavaScript

/**
* UOK Durable Message Bus & Agent Inbox
*
* Purpose: implement Letta-style inter-agent communication with at-least-once
* delivery guarantees via SQLite. Messages survive process restarts and are
* retained with configurable TTL.
*
* Consumer: multi-agent orchestration, cross-turn coordination, and UOK kernel
* observer chains.
*/
import { createHash, randomUUID } from "node:crypto";
import { mkdirSync } from "node:fs";
import { join } from "node:path";
import { sfRoot } from "../paths.js";
import {
compactUokMessages,
getUokConversation,
getUokMessageBusMetrics,
getUokMessageReadIds,
getUokMessagesForAgent,
insertUokMessage,
markUokMessageRead,
openDatabase,
} from "../sf-db.js";
const DEFAULT_RETENTION_DAYS = 7;
const DEFAULT_MAX_INBOX_SIZE = 1000;
const INBOX_REFRESH_INTERVAL_MS = 30_000; // Refresh from DB every 30s
function deterministicMessageId(key) {
const digest = createHash("sha256").update(String(key)).digest("hex");
return `msg-${digest.slice(0, 32)}`;
}
function ensureDb(basePath) {
const dir = sfRoot(basePath);
mkdirSync(dir, { recursive: true });
openDatabase(join(dir, "sf.db"));
}
export class AgentInbox {
constructor(agentId, basePath, options = {}) {
this.agentId = agentId;
this.basePath = basePath;
this.maxSize = options.maxInboxSize ?? DEFAULT_MAX_INBOX_SIZE;
this.retentionDays = options.retentionDays ?? DEFAULT_RETENTION_DAYS;
this._refreshIntervalMs =
options.refreshIntervalMs ?? INBOX_REFRESH_INTERVAL_MS;
this._lastRefresh = 0;
ensureDb(basePath);
this._messages = this._hydrate();
}
_hydrate() {
const messages = getUokMessagesForAgent(this.agentId, this.maxSize, false);
const readIds = new Set(getUokMessageReadIds(this.agentId));
const withRead = messages.map((m) => ({
...m,
read: readIds.has(m.id),
}));
const cutoff = Date.now() - this.retentionDays * 24 * 60 * 60 * 1000;
const recent = withRead.filter((m) => {
const ts = m.sentAt ? Date.parse(m.sentAt) : 0;
return ts >= cutoff;
});
return recent.slice(-this.maxSize);
}
receive(message) {
const enriched = {
...message,
receivedAt: new Date().toISOString(),
read: false,
};
insertUokMessage({
id: enriched.id,
from: enriched.from,
to: enriched.to ?? this.agentId,
body: enriched.body,
metadata: enriched.metadata,
sentAt: enriched.sentAt ?? enriched.receivedAt,
deliveredAt: enriched.deliveredAt ?? enriched.receivedAt,
});
this._messages.push(enriched);
if (this._messages.length > this.maxSize) {
this._messages = this._messages.slice(-this.maxSize);
}
return enriched;
}
_maybeRefresh() {
const now = Date.now();
if (now - this._lastRefresh >= this._refreshIntervalMs) {
this.refresh();
this._lastRefresh = now;
}
}
list(unreadOnly = false) {
this._maybeRefresh();
return unreadOnly
? this._messages.filter((m) => !m.read)
: [...this._messages];
}
markRead(messageId) {
this._maybeRefresh();
const msg = this._messages.find((m) => m.id === messageId);
if (msg) {
msg.read = true;
markUokMessageRead(messageId, this.agentId);
}
return !!msg;
}
get unreadCount() {
this._maybeRefresh();
return this._messages.filter((m) => !m.read).length;
}
refresh() {
this._messages = this._hydrate();
this._lastRefresh = Date.now();
}
}
const DEFAULT_AUTO_COMPACT_THRESHOLD = 10_000;
export class MessageBus {
constructor(basePath, options = {}) {
this.basePath = basePath;
this.retentionDays = options.retentionDays ?? DEFAULT_RETENTION_DAYS;
this.maxInboxSize = options.maxInboxSize ?? DEFAULT_MAX_INBOX_SIZE;
this.autoCompactThreshold =
options.autoCompactThreshold ?? DEFAULT_AUTO_COMPACT_THRESHOLD;
this.inboxes = new Map();
ensureDb(basePath);
}
_maybeAutoCompact() {
if (this.autoCompactThreshold <= 0) return;
const metrics = getUokMessageBusMetrics();
if (metrics.totalMessages >= this.autoCompactThreshold) {
compactUokMessages(this.retentionDays);
}
}
_getOrCreateInbox(agentId) {
if (!this.inboxes.has(agentId)) {
this.inboxes.set(
agentId,
new AgentInbox(agentId, this.basePath, {
retentionDays: this.retentionDays,
maxInboxSize: this.maxInboxSize,
}),
);
}
return this.inboxes.get(agentId);
}
send(from, to, body, metadata = {}) {
const message = {
id: `msg-${randomUUID()}`,
from,
to,
body,
metadata,
sentAt: new Date().toISOString(),
deliveredAt: new Date().toISOString(),
};
insertUokMessage(message);
const targetInbox = this._getOrCreateInbox(to);
const alreadyHas = targetInbox.list().some((m) => m.id === message.id);
if (!alreadyHas) {
targetInbox.receive(message);
}
this._maybeAutoCompact();
return message.id;
}
/**
* Send an idempotent message keyed by a stable event identity.
*
* Purpose: let recurring diagnostics and observer chains publish durable
* notifications without flooding an inbox on every status/doctor poll.
*
* Consumer: UOK diagnostics when surfacing repeated runtime health issues.
*/
sendOnce(from, to, body, metadata = {}, dedupeKey) {
const key = dedupeKey ?? `${from}:${to}:${body}`;
const messageId = deterministicMessageId(key);
// Check if message already exists in inbox before inserting
const targetInbox = this._getOrCreateInbox(to);
const alreadyHas = targetInbox.list().some((m) => m.id === messageId);
if (alreadyHas) {
return messageId; // Idempotent: return existing message id
}
const message = {
id: messageId,
from,
to,
body,
metadata: { ...metadata, dedupeKey: key },
sentAt: new Date().toISOString(),
deliveredAt: new Date().toISOString(),
};
insertUokMessage(message);
targetInbox.receive(message);
this._maybeAutoCompact();
return messageId;
}
broadcast(from, recipients, body, metadata = {}) {
const ids = [];
for (const to of recipients) {
ids.push(this.send(from, to, body, metadata));
}
return ids;
}
getInbox(agentId) {
return this._getOrCreateInbox(agentId);
}
getConversation(agentA, agentB) {
// DB returns DESC; reverse to chronological order (oldest first)
return getUokConversation(agentA, agentB, this.maxInboxSize).reverse();
}
compact() {
return compactUokMessages(this.retentionDays);
}
}