* fix(gateway): normalize session key casing to prevent ghost sessions on Linux On case-sensitive filesystems (Linux), mixed-case session keys like agent:ops:MySession and agent:ops:mysession resolve to different store entries, creating ghost duplicates that never converge. Core changes in session-utils.ts: - resolveSessionStoreKey: lowercase all session key components - canonicalizeSpawnedByForAgent: accept cfg, resolve main-alias references via canonicalizeMainSessionAlias after lowercasing - loadSessionEntry: return legacyKey only when it differs from canonicalKey - resolveGatewaySessionStoreTarget: scan store for case-insensitive matches; add optional scanLegacyKeys param to skip disk reads for read-only callers - Export findStoreKeysIgnoreCase for use by write-path consumers - Compare global/unknown sentinels case-insensitively in all canonicalization functions sessions-resolve.ts: - Make resolveSessionKeyFromResolveParams async for inline migration - Check canonical key first (fast path), then fall back to legacy scan - Delete ALL legacy case-variant keys in a single updateSessionStore pass Fixes #12603 * fix(gateway): propagate canonical keys and clean up all case variants on write paths - agent.ts: use canonicalizeSpawnedByForAgent (with cfg) instead of raw toLowerCase; use findStoreKeysIgnoreCase to delete all legacy variants on store write; pass canonicalKey to addChatRun, registerAgentRunContext, resolveSendPolicy, and agentCommand - sessions.ts: replace single-key migration with full case-variant cleanup via findStoreKeysIgnoreCase in patch/reset/delete/compact handlers; add case-insensitive fallback in preview (store already loaded); make sessions.resolve handler async; pass scanLegacyKeys: false in preview - server-node-events.ts: use findStoreKeysIgnoreCase to clean all legacy variants on voice.transcript and agent.request write paths; pass canonicalKey to addChatRun and agentCommand * test(gateway): add session key case-normalization tests Cover the case-insensitive session key canonicalization logic: - resolveSessionStoreKey normalizes mixed-case bare and prefixed keys - resolveSessionStoreKey resolves mixed-case main aliases (MAIN, Main) - resolveGatewaySessionStoreTarget includes legacy mixed-case store keys - resolveGatewaySessionStoreTarget collects all case-variant duplicates - resolveGatewaySessionStoreTarget finds legacy main alias keys with customized mainKey configuration All 5 tests fail before the production changes, pass after. * fix: clean legacy session alias cleanup gaps (openclaw#12846) thanks @mcaxtr --------- Co-authored-by: Peter Steinberger <steipete@gmail.com>
266 lines
8.8 KiB
TypeScript
266 lines
8.8 KiB
TypeScript
import { randomUUID } from "node:crypto";
|
|
import type { NodeEvent, NodeEventContext } from "./server-node-events-types.js";
|
|
import { normalizeChannelId } from "../channels/plugins/index.js";
|
|
import { agentCommand } from "../commands/agent.js";
|
|
import { loadConfig } from "../config/config.js";
|
|
import { updateSessionStore } from "../config/sessions.js";
|
|
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
|
import { enqueueSystemEvent } from "../infra/system-events.js";
|
|
import { normalizeMainKey } from "../routing/session-key.js";
|
|
import { defaultRuntime } from "../runtime.js";
|
|
import {
|
|
loadSessionEntry,
|
|
pruneLegacyStoreKeys,
|
|
resolveGatewaySessionStoreTarget,
|
|
} from "./session-utils.js";
|
|
import { formatForLog } from "./ws-log.js";
|
|
|
|
export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt: NodeEvent) => {
|
|
switch (evt.event) {
|
|
case "voice.transcript": {
|
|
if (!evt.payloadJSON) {
|
|
return;
|
|
}
|
|
let payload: unknown;
|
|
try {
|
|
payload = JSON.parse(evt.payloadJSON) as unknown;
|
|
} catch {
|
|
return;
|
|
}
|
|
const obj =
|
|
typeof payload === "object" && payload !== null ? (payload as Record<string, unknown>) : {};
|
|
const text = typeof obj.text === "string" ? obj.text.trim() : "";
|
|
if (!text) {
|
|
return;
|
|
}
|
|
if (text.length > 20_000) {
|
|
return;
|
|
}
|
|
const sessionKeyRaw = typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : "";
|
|
const cfg = loadConfig();
|
|
const rawMainKey = normalizeMainKey(cfg.session?.mainKey);
|
|
const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : rawMainKey;
|
|
const { storePath, entry, canonicalKey } = loadSessionEntry(sessionKey);
|
|
const now = Date.now();
|
|
const sessionId = entry?.sessionId ?? randomUUID();
|
|
if (storePath) {
|
|
await updateSessionStore(storePath, (store) => {
|
|
const target = resolveGatewaySessionStoreTarget({ cfg, key: sessionKey, store });
|
|
pruneLegacyStoreKeys({
|
|
store,
|
|
canonicalKey: target.canonicalKey,
|
|
candidates: target.storeKeys,
|
|
});
|
|
store[canonicalKey] = {
|
|
sessionId,
|
|
updatedAt: now,
|
|
thinkingLevel: entry?.thinkingLevel,
|
|
verboseLevel: entry?.verboseLevel,
|
|
reasoningLevel: entry?.reasoningLevel,
|
|
systemSent: entry?.systemSent,
|
|
sendPolicy: entry?.sendPolicy,
|
|
lastChannel: entry?.lastChannel,
|
|
lastTo: entry?.lastTo,
|
|
};
|
|
});
|
|
}
|
|
|
|
// Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send).
|
|
// This maps agent bus events (keyed by sessionId) to chat events (keyed by clientRunId).
|
|
ctx.addChatRun(sessionId, {
|
|
sessionKey: canonicalKey,
|
|
clientRunId: `voice-${randomUUID()}`,
|
|
});
|
|
|
|
void agentCommand(
|
|
{
|
|
message: text,
|
|
sessionId,
|
|
sessionKey: canonicalKey,
|
|
thinking: "low",
|
|
deliver: false,
|
|
messageChannel: "node",
|
|
},
|
|
defaultRuntime,
|
|
ctx.deps,
|
|
).catch((err) => {
|
|
ctx.logGateway.warn(`agent failed node=${nodeId}: ${formatForLog(err)}`);
|
|
});
|
|
return;
|
|
}
|
|
case "agent.request": {
|
|
if (!evt.payloadJSON) {
|
|
return;
|
|
}
|
|
type AgentDeepLink = {
|
|
message?: string;
|
|
sessionKey?: string | null;
|
|
thinking?: string | null;
|
|
deliver?: boolean;
|
|
to?: string | null;
|
|
channel?: string | null;
|
|
timeoutSeconds?: number | null;
|
|
key?: string | null;
|
|
};
|
|
let link: AgentDeepLink | null = null;
|
|
try {
|
|
link = JSON.parse(evt.payloadJSON) as AgentDeepLink;
|
|
} catch {
|
|
return;
|
|
}
|
|
const message = (link?.message ?? "").trim();
|
|
if (!message) {
|
|
return;
|
|
}
|
|
if (message.length > 20_000) {
|
|
return;
|
|
}
|
|
|
|
const channelRaw = typeof link?.channel === "string" ? link.channel.trim() : "";
|
|
const channel = normalizeChannelId(channelRaw) ?? undefined;
|
|
const to = typeof link?.to === "string" && link.to.trim() ? link.to.trim() : undefined;
|
|
const deliver = Boolean(link?.deliver) && Boolean(channel);
|
|
|
|
const sessionKeyRaw = (link?.sessionKey ?? "").trim();
|
|
const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : `node-${nodeId}`;
|
|
const cfg = loadConfig();
|
|
const { storePath, entry, canonicalKey } = loadSessionEntry(sessionKey);
|
|
const now = Date.now();
|
|
const sessionId = entry?.sessionId ?? randomUUID();
|
|
if (storePath) {
|
|
await updateSessionStore(storePath, (store) => {
|
|
const target = resolveGatewaySessionStoreTarget({ cfg, key: sessionKey, store });
|
|
pruneLegacyStoreKeys({
|
|
store,
|
|
canonicalKey: target.canonicalKey,
|
|
candidates: target.storeKeys,
|
|
});
|
|
store[canonicalKey] = {
|
|
sessionId,
|
|
updatedAt: now,
|
|
thinkingLevel: entry?.thinkingLevel,
|
|
verboseLevel: entry?.verboseLevel,
|
|
reasoningLevel: entry?.reasoningLevel,
|
|
systemSent: entry?.systemSent,
|
|
sendPolicy: entry?.sendPolicy,
|
|
lastChannel: entry?.lastChannel,
|
|
lastTo: entry?.lastTo,
|
|
};
|
|
});
|
|
}
|
|
|
|
void agentCommand(
|
|
{
|
|
message,
|
|
sessionId,
|
|
sessionKey: canonicalKey,
|
|
thinking: link?.thinking ?? undefined,
|
|
deliver,
|
|
to,
|
|
channel,
|
|
timeout:
|
|
typeof link?.timeoutSeconds === "number" ? link.timeoutSeconds.toString() : undefined,
|
|
messageChannel: "node",
|
|
},
|
|
defaultRuntime,
|
|
ctx.deps,
|
|
).catch((err) => {
|
|
ctx.logGateway.warn(`agent failed node=${nodeId}: ${formatForLog(err)}`);
|
|
});
|
|
return;
|
|
}
|
|
case "chat.subscribe": {
|
|
if (!evt.payloadJSON) {
|
|
return;
|
|
}
|
|
let payload: unknown;
|
|
try {
|
|
payload = JSON.parse(evt.payloadJSON) as unknown;
|
|
} catch {
|
|
return;
|
|
}
|
|
const obj =
|
|
typeof payload === "object" && payload !== null ? (payload as Record<string, unknown>) : {};
|
|
const sessionKey = typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : "";
|
|
if (!sessionKey) {
|
|
return;
|
|
}
|
|
ctx.nodeSubscribe(nodeId, sessionKey);
|
|
return;
|
|
}
|
|
case "chat.unsubscribe": {
|
|
if (!evt.payloadJSON) {
|
|
return;
|
|
}
|
|
let payload: unknown;
|
|
try {
|
|
payload = JSON.parse(evt.payloadJSON) as unknown;
|
|
} catch {
|
|
return;
|
|
}
|
|
const obj =
|
|
typeof payload === "object" && payload !== null ? (payload as Record<string, unknown>) : {};
|
|
const sessionKey = typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : "";
|
|
if (!sessionKey) {
|
|
return;
|
|
}
|
|
ctx.nodeUnsubscribe(nodeId, sessionKey);
|
|
return;
|
|
}
|
|
case "exec.started":
|
|
case "exec.finished":
|
|
case "exec.denied": {
|
|
if (!evt.payloadJSON) {
|
|
return;
|
|
}
|
|
let payload: unknown;
|
|
try {
|
|
payload = JSON.parse(evt.payloadJSON) as unknown;
|
|
} catch {
|
|
return;
|
|
}
|
|
const obj =
|
|
typeof payload === "object" && payload !== null ? (payload as Record<string, unknown>) : {};
|
|
const sessionKey =
|
|
typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : `node-${nodeId}`;
|
|
if (!sessionKey) {
|
|
return;
|
|
}
|
|
const runId = typeof obj.runId === "string" ? obj.runId.trim() : "";
|
|
const command = typeof obj.command === "string" ? obj.command.trim() : "";
|
|
const exitCode =
|
|
typeof obj.exitCode === "number" && Number.isFinite(obj.exitCode)
|
|
? obj.exitCode
|
|
: undefined;
|
|
const timedOut = obj.timedOut === true;
|
|
const output = typeof obj.output === "string" ? obj.output.trim() : "";
|
|
const reason = typeof obj.reason === "string" ? obj.reason.trim() : "";
|
|
|
|
let text = "";
|
|
if (evt.event === "exec.started") {
|
|
text = `Exec started (node=${nodeId}${runId ? ` id=${runId}` : ""})`;
|
|
if (command) {
|
|
text += `: ${command}`;
|
|
}
|
|
} else if (evt.event === "exec.finished") {
|
|
const exitLabel = timedOut ? "timeout" : `code ${exitCode ?? "?"}`;
|
|
text = `Exec finished (node=${nodeId}${runId ? ` id=${runId}` : ""}, ${exitLabel})`;
|
|
if (output) {
|
|
text += `\n${output}`;
|
|
}
|
|
} else {
|
|
text = `Exec denied (node=${nodeId}${runId ? ` id=${runId}` : ""}${reason ? `, ${reason}` : ""})`;
|
|
if (command) {
|
|
text += `: ${command}`;
|
|
}
|
|
}
|
|
|
|
enqueueSystemEvent(text, { sessionKey, contextKey: runId ? `exec:${runId}` : "exec" });
|
|
requestHeartbeatNow({ reason: "exec-event" });
|
|
return;
|
|
}
|
|
default:
|
|
return;
|
|
}
|
|
};
|