Files
Moltbot/src/gateway/server-node-events.ts
2026-02-15 18:02:55 +00:00

284 lines
9.1 KiB
TypeScript

import { randomUUID } from "node:crypto";
import type { NodeEvent, NodeEventContext } from "./server-node-events-types.js";
import { normalizeChannelId } from "../channels/plugins/index.js";
import { agentCommand } from "../commands/agent.js";
import { loadConfig } from "../config/config.js";
import { updateSessionStore } from "../config/sessions.js";
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { normalizeMainKey } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
import {
loadSessionEntry,
pruneLegacyStoreKeys,
resolveGatewaySessionStoreTarget,
} from "./session-utils.js";
import { formatForLog } from "./ws-log.js";
const MAX_EXEC_EVENT_OUTPUT_CHARS = 180;
function compactExecEventOutput(raw: string) {
const normalized = raw.replace(/\s+/g, " ").trim();
if (!normalized) {
return "";
}
if (normalized.length <= MAX_EXEC_EVENT_OUTPUT_CHARS) {
return normalized;
}
const safe = Math.max(1, MAX_EXEC_EVENT_OUTPUT_CHARS - 1);
return `${normalized.slice(0, safe)}`;
}
type LoadedSessionEntry = ReturnType<typeof loadSessionEntry>;
async function touchSessionStore(params: {
cfg: ReturnType<typeof loadConfig>;
sessionKey: string;
storePath: LoadedSessionEntry["storePath"];
canonicalKey: LoadedSessionEntry["canonicalKey"];
entry: LoadedSessionEntry["entry"];
sessionId: string;
now: number;
}) {
const { storePath } = params;
if (!storePath) {
return;
}
await updateSessionStore(storePath, (store) => {
const target = resolveGatewaySessionStoreTarget({
cfg: params.cfg,
key: params.sessionKey,
store,
});
pruneLegacyStoreKeys({
store,
canonicalKey: target.canonicalKey,
candidates: target.storeKeys,
});
store[params.canonicalKey] = {
sessionId: params.sessionId,
updatedAt: params.now,
thinkingLevel: params.entry?.thinkingLevel,
verboseLevel: params.entry?.verboseLevel,
reasoningLevel: params.entry?.reasoningLevel,
systemSent: params.entry?.systemSent,
sendPolicy: params.entry?.sendPolicy,
lastChannel: params.entry?.lastChannel,
lastTo: params.entry?.lastTo,
};
});
}
function parseSessionKeyFromPayloadJSON(payloadJSON: string): string | null {
let payload: unknown;
try {
payload = JSON.parse(payloadJSON) as unknown;
} catch {
return null;
}
if (typeof payload !== "object" || payload === null) {
return null;
}
const obj = payload as Record<string, unknown>;
const sessionKey = typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : "";
return sessionKey.length > 0 ? sessionKey : null;
}
export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt: NodeEvent) => {
switch (evt.event) {
case "voice.transcript": {
if (!evt.payloadJSON) {
return;
}
let payload: unknown;
try {
payload = JSON.parse(evt.payloadJSON) as unknown;
} catch {
return;
}
const obj =
typeof payload === "object" && payload !== null ? (payload as Record<string, unknown>) : {};
const text = typeof obj.text === "string" ? obj.text.trim() : "";
if (!text) {
return;
}
if (text.length > 20_000) {
return;
}
const sessionKeyRaw = typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : "";
const cfg = loadConfig();
const rawMainKey = normalizeMainKey(cfg.session?.mainKey);
const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : rawMainKey;
const { storePath, entry, canonicalKey } = loadSessionEntry(sessionKey);
const now = Date.now();
const sessionId = entry?.sessionId ?? randomUUID();
await touchSessionStore({ cfg, sessionKey, storePath, canonicalKey, entry, sessionId, now });
// Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send).
// This maps agent bus events (keyed by sessionId) to chat events (keyed by clientRunId).
ctx.addChatRun(sessionId, {
sessionKey: canonicalKey,
clientRunId: `voice-${randomUUID()}`,
});
void agentCommand(
{
message: text,
sessionId,
sessionKey: canonicalKey,
thinking: "low",
deliver: false,
messageChannel: "node",
},
defaultRuntime,
ctx.deps,
).catch((err) => {
ctx.logGateway.warn(`agent failed node=${nodeId}: ${formatForLog(err)}`);
});
return;
}
case "agent.request": {
if (!evt.payloadJSON) {
return;
}
type AgentDeepLink = {
message?: string;
sessionKey?: string | null;
thinking?: string | null;
deliver?: boolean;
to?: string | null;
channel?: string | null;
timeoutSeconds?: number | null;
key?: string | null;
};
let link: AgentDeepLink | null = null;
try {
link = JSON.parse(evt.payloadJSON) as AgentDeepLink;
} catch {
return;
}
const message = (link?.message ?? "").trim();
if (!message) {
return;
}
if (message.length > 20_000) {
return;
}
const channelRaw = typeof link?.channel === "string" ? link.channel.trim() : "";
const channel = normalizeChannelId(channelRaw) ?? undefined;
const to = typeof link?.to === "string" && link.to.trim() ? link.to.trim() : undefined;
const deliver = Boolean(link?.deliver) && Boolean(channel);
const sessionKeyRaw = (link?.sessionKey ?? "").trim();
const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : `node-${nodeId}`;
const cfg = loadConfig();
const { storePath, entry, canonicalKey } = loadSessionEntry(sessionKey);
const now = Date.now();
const sessionId = entry?.sessionId ?? randomUUID();
await touchSessionStore({ cfg, sessionKey, storePath, canonicalKey, entry, sessionId, now });
void agentCommand(
{
message,
sessionId,
sessionKey: canonicalKey,
thinking: link?.thinking ?? undefined,
deliver,
to,
channel,
timeout:
typeof link?.timeoutSeconds === "number" ? link.timeoutSeconds.toString() : undefined,
messageChannel: "node",
},
defaultRuntime,
ctx.deps,
).catch((err) => {
ctx.logGateway.warn(`agent failed node=${nodeId}: ${formatForLog(err)}`);
});
return;
}
case "chat.subscribe": {
if (!evt.payloadJSON) {
return;
}
const sessionKey = parseSessionKeyFromPayloadJSON(evt.payloadJSON);
if (!sessionKey) {
return;
}
ctx.nodeSubscribe(nodeId, sessionKey);
return;
}
case "chat.unsubscribe": {
if (!evt.payloadJSON) {
return;
}
const sessionKey = parseSessionKeyFromPayloadJSON(evt.payloadJSON);
if (!sessionKey) {
return;
}
ctx.nodeUnsubscribe(nodeId, sessionKey);
return;
}
case "exec.started":
case "exec.finished":
case "exec.denied": {
if (!evt.payloadJSON) {
return;
}
let payload: unknown;
try {
payload = JSON.parse(evt.payloadJSON) as unknown;
} catch {
return;
}
const obj =
typeof payload === "object" && payload !== null ? (payload as Record<string, unknown>) : {};
const sessionKey =
typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : `node-${nodeId}`;
if (!sessionKey) {
return;
}
const runId = typeof obj.runId === "string" ? obj.runId.trim() : "";
const command = typeof obj.command === "string" ? obj.command.trim() : "";
const exitCode =
typeof obj.exitCode === "number" && Number.isFinite(obj.exitCode)
? obj.exitCode
: undefined;
const timedOut = obj.timedOut === true;
const output = typeof obj.output === "string" ? obj.output.trim() : "";
const reason = typeof obj.reason === "string" ? obj.reason.trim() : "";
let text = "";
if (evt.event === "exec.started") {
text = `Exec started (node=${nodeId}${runId ? ` id=${runId}` : ""})`;
if (command) {
text += `: ${command}`;
}
} else if (evt.event === "exec.finished") {
const exitLabel = timedOut ? "timeout" : `code ${exitCode ?? "?"}`;
const compactOutput = compactExecEventOutput(output);
const shouldNotify = timedOut || exitCode !== 0 || compactOutput.length > 0;
if (!shouldNotify) {
return;
}
text = `Exec finished (node=${nodeId}${runId ? ` id=${runId}` : ""}, ${exitLabel})`;
if (compactOutput) {
text += `\n${compactOutput}`;
}
} else {
text = `Exec denied (node=${nodeId}${runId ? ` id=${runId}` : ""}${reason ? `, ${reason}` : ""})`;
if (command) {
text += `: ${command}`;
}
}
enqueueSystemEvent(text, { sessionKey, contextKey: runId ? `exec:${runId}` : "exec" });
requestHeartbeatNow({ reason: "exec-event" });
return;
}
default:
return;
}
};