diff --git a/src/agents/pi-embedded-runner/run/attempt.test.ts b/src/agents/pi-embedded-runner/run/attempt.test.ts index 613169dcb..8dcd25a41 100644 --- a/src/agents/pi-embedded-runner/run/attempt.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.test.ts @@ -60,8 +60,8 @@ describe("injectHistoryImagesIntoMessages", () => { }); describe("resolvePromptBuildHookResult", () => { - it("reuses precomputed legacy before_agent_start result without invoking hook again", async () => { - const hookRunner = { + function createLegacyOnlyHookRunner() { + return { hasHooks: vi.fn( (hookName: "before_prompt_build" | "before_agent_start") => hookName === "before_agent_start", @@ -69,6 +69,10 @@ describe("resolvePromptBuildHookResult", () => { runBeforePromptBuild: vi.fn(async () => undefined), runBeforeAgentStart: vi.fn(async () => ({ prependContext: "from-hook" })), }; + } + + it("reuses precomputed legacy before_agent_start result without invoking hook again", async () => { + const hookRunner = createLegacyOnlyHookRunner(); const result = await resolvePromptBuildHookResult({ prompt: "hello", messages: [], @@ -85,14 +89,7 @@ describe("resolvePromptBuildHookResult", () => { }); it("calls legacy hook when precomputed result is absent", async () => { - const hookRunner = { - hasHooks: vi.fn( - (hookName: "before_prompt_build" | "before_agent_start") => - hookName === "before_agent_start", - ), - runBeforePromptBuild: vi.fn(async () => undefined), - runBeforeAgentStart: vi.fn(async () => ({ prependContext: "from-hook" })), - }; + const hookRunner = createLegacyOnlyHookRunner(); const messages = [{ role: "user", content: "ctx" }]; const result = await resolvePromptBuildHookResult({ prompt: "hello", diff --git a/src/agents/pi-embedded-runner/run/payloads.errors.test.ts b/src/agents/pi-embedded-runner/run/payloads.errors.test.ts index 6804f035f..f255a700d 100644 --- a/src/agents/pi-embedded-runner/run/payloads.errors.test.ts +++ b/src/agents/pi-embedded-runner/run/payloads.errors.test.ts @@ -2,9 +2,15 @@ import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it } from "vitest"; import { formatBillingErrorMessage } from "../../pi-embedded-helpers.js"; import { makeAssistantMessageFixture } from "../../test-helpers/assistant-message-fixtures.js"; -import { buildEmbeddedRunPayloads } from "./payloads.js"; +import { + buildPayloads, + expectSinglePayloadText, + expectSingleToolErrorPayload, +} from "./payloads.test-helpers.js"; describe("buildEmbeddedRunPayloads", () => { + const OVERLOADED_FALLBACK_TEXT = + "The AI service is temporarily overloaded. Please try again in a moment."; const errorJson = '{"type":"error","error":{"details":null,"type":"overloaded_error","message":"Overloaded"},"request_id":"req_011CX7DwS7tSvggaNHmefwWg"}'; const errorJsonPretty = `{ @@ -22,31 +28,25 @@ describe("buildEmbeddedRunPayloads", () => { content: [{ type: "text", text: errorJson }], ...overrides, }); - - type BuildPayloadParams = Parameters[0]; - const buildPayloads = (overrides: Partial = {}) => - buildEmbeddedRunPayloads({ - assistantTexts: [], - toolMetas: [], - lastAssistant: undefined, - sessionKey: "session:telegram", - inlineToolResultsAllowed: false, - verboseLevel: "off", - reasoningLevel: "off", - toolResultFormat: "plain", - ...overrides, + const makeStoppedAssistant = () => + makeAssistant({ + stopReason: "stop", + errorMessage: undefined, + content: [], }); + const expectOverloadedFallback = (payloads: ReturnType) => { + expect(payloads).toHaveLength(1); + expect(payloads[0]?.text).toBe(OVERLOADED_FALLBACK_TEXT); + }; + it("suppresses raw API error JSON when the assistant errored", () => { const payloads = buildPayloads({ assistantTexts: [errorJson], lastAssistant: makeAssistant({}), }); - expect(payloads).toHaveLength(1); - expect(payloads[0]?.text).toBe( - "The AI service is temporarily overloaded. Please try again in a moment.", - ); + expectOverloadedFallback(payloads); expect(payloads[0]?.isError).toBe(true); expect(payloads.some((payload) => payload.text === errorJson)).toBe(false); }); @@ -59,10 +59,7 @@ describe("buildEmbeddedRunPayloads", () => { verboseLevel: "on", }); - expect(payloads).toHaveLength(1); - expect(payloads[0]?.text).toBe( - "The AI service is temporarily overloaded. Please try again in a moment.", - ); + expectOverloadedFallback(payloads); expect(payloads.some((payload) => payload.text === errorJsonPretty)).toBe(false); }); @@ -71,10 +68,7 @@ describe("buildEmbeddedRunPayloads", () => { lastAssistant: makeAssistant({ content: [{ type: "text", text: errorJsonPretty }] }), }); - expect(payloads).toHaveLength(1); - expect(payloads[0]?.text).toBe( - "The AI service is temporarily overloaded. Please try again in a moment.", - ); + expectOverloadedFallback(payloads); expect(payloads.some((payload) => payload.text?.includes("request_id"))).toBe(false); }); @@ -108,15 +102,10 @@ describe("buildEmbeddedRunPayloads", () => { it("does not suppress error-shaped JSON when the assistant did not error", () => { const payloads = buildPayloads({ assistantTexts: [errorJsonPretty], - lastAssistant: makeAssistant({ - stopReason: "stop", - errorMessage: undefined, - content: [], - }), + lastAssistant: makeStoppedAssistant(), }); - expect(payloads).toHaveLength(1); - expect(payloads[0]?.text).toBe(errorJsonPretty.trim()); + expectSinglePayloadText(payloads, errorJsonPretty.trim()); }); it("adds a fallback error when a tool fails and no assistant output exists", () => { @@ -133,31 +122,21 @@ describe("buildEmbeddedRunPayloads", () => { it("does not add tool error fallback when assistant output exists", () => { const payloads = buildPayloads({ assistantTexts: ["All good"], - lastAssistant: makeAssistant({ - stopReason: "stop", - errorMessage: undefined, - content: [], - }), + lastAssistant: makeStoppedAssistant(), lastToolError: { toolName: "browser", error: "tab not found" }, }); - expect(payloads).toHaveLength(1); - expect(payloads[0]?.text).toBe("All good"); + expectSinglePayloadText(payloads, "All good"); }); it("adds completion fallback when tools run successfully without final assistant text", () => { const payloads = buildPayloads({ toolMetas: [{ toolName: "write", meta: "/tmp/out.md" }], - lastAssistant: makeAssistant({ - stopReason: "stop", - errorMessage: undefined, - content: [], - }), + lastAssistant: makeStoppedAssistant(), }); - expect(payloads).toHaveLength(1); + expectSinglePayloadText(payloads, "✅ Done."); expect(payloads[0]?.isError).toBeUndefined(); - expect(payloads[0]?.text).toBe("✅ Done."); }); it("does not add completion fallback when the run still has a tool error", () => { @@ -171,11 +150,7 @@ describe("buildEmbeddedRunPayloads", () => { it("does not add completion fallback when no tools ran", () => { const payloads = buildPayloads({ - lastAssistant: makeAssistant({ - stopReason: "stop", - errorMessage: undefined, - content: [], - }), + lastAssistant: makeStoppedAssistant(), }); expect(payloads).toHaveLength(0); @@ -199,10 +174,10 @@ describe("buildEmbeddedRunPayloads", () => { verboseLevel: "on", }); - expect(payloads).toHaveLength(1); - expect(payloads[0]?.isError).toBe(true); - expect(payloads[0]?.text).toContain("Exec"); - expect(payloads[0]?.text).toContain("code 1"); + expectSingleToolErrorPayload(payloads, { + title: "Exec", + detail: "code 1", + }); }); it("does not add tool error fallback when assistant text exists after tool calls", () => { diff --git a/src/agents/pi-embedded-runner/run/payloads.test-helpers.ts b/src/agents/pi-embedded-runner/run/payloads.test-helpers.ts new file mode 100644 index 000000000..9cb6bb5a2 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/payloads.test-helpers.ts @@ -0,0 +1,41 @@ +import { expect } from "vitest"; +import { buildEmbeddedRunPayloads } from "./payloads.js"; + +export type BuildPayloadParams = Parameters[0]; +type RunPayloads = ReturnType; + +export function buildPayloads(overrides: Partial = {}) { + return buildEmbeddedRunPayloads({ + assistantTexts: [], + toolMetas: [], + lastAssistant: undefined, + sessionKey: "session:telegram", + inlineToolResultsAllowed: false, + verboseLevel: "off", + reasoningLevel: "off", + toolResultFormat: "plain", + ...overrides, + }); +} + +export function expectSinglePayloadText( + payloads: RunPayloads, + text: string, + expectedError?: boolean, +): void { + expect(payloads).toHaveLength(1); + expect(payloads[0]?.text).toBe(text); + if (typeof expectedError === "boolean") { + expect(payloads[0]?.isError).toBe(expectedError); + } +} + +export function expectSingleToolErrorPayload( + payloads: RunPayloads, + params: { title: string; detail: string }, +): void { + expect(payloads).toHaveLength(1); + expect(payloads[0]?.isError).toBe(true); + expect(payloads[0]?.text).toContain(params.title); + expect(payloads[0]?.text).toContain(params.detail); +} diff --git a/src/agents/pi-embedded-runner/run/payloads.test.ts b/src/agents/pi-embedded-runner/run/payloads.test.ts index bc35bb31c..871aaa3be 100644 --- a/src/agents/pi-embedded-runner/run/payloads.test.ts +++ b/src/agents/pi-embedded-runner/run/payloads.test.ts @@ -1,21 +1,5 @@ import { describe, expect, it } from "vitest"; -import { buildEmbeddedRunPayloads } from "./payloads.js"; - -type BuildPayloadParams = Parameters[0]; - -function buildPayloads(overrides: Partial = {}) { - return buildEmbeddedRunPayloads({ - assistantTexts: [], - toolMetas: [], - lastAssistant: undefined, - sessionKey: "session:telegram", - inlineToolResultsAllowed: false, - verboseLevel: "off", - reasoningLevel: "off", - toolResultFormat: "plain", - ...overrides, - }); -} +import { buildPayloads, expectSingleToolErrorPayload } from "./payloads.test-helpers.js"; describe("buildEmbeddedRunPayloads tool-error warnings", () => { it("suppresses exec tool errors when verbose mode is off", () => { @@ -33,10 +17,10 @@ describe("buildEmbeddedRunPayloads tool-error warnings", () => { verboseLevel: "on", }); - expect(payloads).toHaveLength(1); - expect(payloads[0]?.isError).toBe(true); - expect(payloads[0]?.text).toContain("Exec"); - expect(payloads[0]?.text).toContain("command failed"); + expectSingleToolErrorPayload(payloads, { + title: "Exec", + detail: "command failed", + }); }); it("keeps non-exec mutating tool failures visible", () => { diff --git a/src/agents/pi-extensions/context-pruning/pruner.ts b/src/agents/pi-extensions/context-pruning/pruner.ts index acfa63166..f9e3791b1 100644 --- a/src/agents/pi-extensions/context-pruning/pruner.ts +++ b/src/agents/pi-extensions/context-pruning/pruner.ts @@ -96,22 +96,26 @@ function hasImageBlocks(content: ReadonlyArray): boo return false; } +function estimateTextAndImageChars(content: ReadonlyArray): number { + let chars = 0; + for (const block of content) { + if (block.type === "text") { + chars += block.text.length; + } + if (block.type === "image") { + chars += IMAGE_CHAR_ESTIMATE; + } + } + return chars; +} + function estimateMessageChars(message: AgentMessage): number { if (message.role === "user") { const content = message.content; if (typeof content === "string") { return content.length; } - let chars = 0; - for (const b of content) { - if (b.type === "text") { - chars += b.text.length; - } - if (b.type === "image") { - chars += IMAGE_CHAR_ESTIMATE; - } - } - return chars; + return estimateTextAndImageChars(content); } if (message.role === "assistant") { @@ -135,16 +139,7 @@ function estimateMessageChars(message: AgentMessage): number { } if (message.role === "toolResult") { - let chars = 0; - for (const b of message.content) { - if (b.type === "text") { - chars += b.text.length; - } - if (b.type === "image") { - chars += IMAGE_CHAR_ESTIMATE; - } - } - return chars; + return estimateTextAndImageChars(message.content); } return 256; diff --git a/src/auto-reply/command-control.test.ts b/src/auto-reply/command-control.test.ts index d322acadd..9691391a2 100644 --- a/src/auto-reply/command-control.test.ts +++ b/src/auto-reply/command-control.test.ts @@ -27,118 +27,79 @@ afterEach(() => { }); describe("resolveCommandAuthorization", () => { - it("falls back from empty SenderId to SenderE164", () => { + function resolveWhatsAppAuthorization(params: { + from: string; + senderId?: string; + senderE164?: string; + allowFrom: string[]; + }) { const cfg = { - channels: { whatsapp: { allowFrom: ["+123"] } }, + channels: { whatsapp: { allowFrom: params.allowFrom } }, } as OpenClawConfig; - const ctx = { Provider: "whatsapp", Surface: "whatsapp", - From: "whatsapp:+999", - SenderId: "", - SenderE164: "+123", + From: params.from, + SenderId: params.senderId, + SenderE164: params.senderE164, } as MsgContext; - - const auth = resolveCommandAuthorization({ + return resolveCommandAuthorization({ ctx, cfg, commandAuthorized: true, }); + } - expect(auth.senderId).toBe("+123"); - expect(auth.isAuthorizedSender).toBe(true); - }); - - it("falls back from whitespace SenderId to SenderE164", () => { - const cfg = { - channels: { whatsapp: { allowFrom: ["+123"] } }, - } as OpenClawConfig; - - const ctx = { - Provider: "whatsapp", - Surface: "whatsapp", - From: "whatsapp:+999", - SenderId: " ", - SenderE164: "+123", - } as MsgContext; - - const auth = resolveCommandAuthorization({ - ctx, - cfg, - commandAuthorized: true, + it.each([ + { + name: "falls back from empty SenderId to SenderE164", + from: "whatsapp:+999", + senderId: "", + senderE164: "+123", + allowFrom: ["+123"], + expectedSenderId: "+123", + }, + { + name: "falls back from whitespace SenderId to SenderE164", + from: "whatsapp:+999", + senderId: " ", + senderE164: "+123", + allowFrom: ["+123"], + expectedSenderId: "+123", + }, + { + name: "falls back to From when SenderId and SenderE164 are whitespace", + from: "whatsapp:+999", + senderId: " ", + senderE164: " ", + allowFrom: ["+999"], + expectedSenderId: "+999", + }, + { + name: "falls back from un-normalizable SenderId to SenderE164", + from: "whatsapp:+999", + senderId: "wat", + senderE164: "+123", + allowFrom: ["+123"], + expectedSenderId: "+123", + }, + { + name: "prefers SenderE164 when SenderId does not match allowFrom", + from: "whatsapp:120363401234567890@g.us", + senderId: "123@lid", + senderE164: "+41796666864", + allowFrom: ["+41796666864"], + expectedSenderId: "+41796666864", + }, + ])("$name", ({ from, senderId, senderE164, allowFrom, expectedSenderId }) => { + const auth = resolveWhatsAppAuthorization({ + from, + senderId, + senderE164, + allowFrom, }); - expect(auth.senderId).toBe("+123"); - expect(auth.isAuthorizedSender).toBe(true); - }); - - it("falls back to From when SenderId and SenderE164 are whitespace", () => { - const cfg = { - channels: { whatsapp: { allowFrom: ["+999"] } }, - } as OpenClawConfig; - - const ctx = { - Provider: "whatsapp", - Surface: "whatsapp", - From: "whatsapp:+999", - SenderId: " ", - SenderE164: " ", - } as MsgContext; - - const auth = resolveCommandAuthorization({ - ctx, - cfg, - commandAuthorized: true, - }); - - expect(auth.senderId).toBe("+999"); - expect(auth.isAuthorizedSender).toBe(true); - }); - - it("falls back from un-normalizable SenderId to SenderE164", () => { - const cfg = { - channels: { whatsapp: { allowFrom: ["+123"] } }, - } as OpenClawConfig; - - const ctx = { - Provider: "whatsapp", - Surface: "whatsapp", - From: "whatsapp:+999", - SenderId: "wat", - SenderE164: "+123", - } as MsgContext; - - const auth = resolveCommandAuthorization({ - ctx, - cfg, - commandAuthorized: true, - }); - - expect(auth.senderId).toBe("+123"); - expect(auth.isAuthorizedSender).toBe(true); - }); - - it("prefers SenderE164 when SenderId does not match allowFrom", () => { - const cfg = { - channels: { whatsapp: { allowFrom: ["+41796666864"] } }, - } as OpenClawConfig; - - const ctx = { - Provider: "whatsapp", - Surface: "whatsapp", - From: "whatsapp:120363401234567890@g.us", - SenderId: "123@lid", - SenderE164: "+41796666864", - } as MsgContext; - - const auth = resolveCommandAuthorization({ - ctx, - cfg, - commandAuthorized: true, - }); - - expect(auth.senderId).toBe("+41796666864"); + expect(auth.senderId).toBe(expectedSenderId); expect(auth.isAuthorizedSender).toBe(true); }); diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index a5add8541..10d4efdd5 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -148,6 +148,27 @@ describe("createFollowupRunner compaction", () => { }); describe("createFollowupRunner messaging tool dedupe", () => { + function createMessagingDedupeRunner( + onBlockReply: (payload: unknown) => Promise, + overrides: Partial<{ + sessionEntry: SessionEntry; + sessionStore: Record; + sessionKey: string; + storePath: string; + }> = {}, + ) { + return createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + sessionEntry: overrides.sessionEntry, + sessionStore: overrides.sessionStore, + sessionKey: overrides.sessionKey, + storePath: overrides.storePath, + }); + } + it("drops payloads already sent via messaging tool", async () => { const onBlockReply = vi.fn(async () => {}); runEmbeddedPiAgentMock.mockResolvedValueOnce({ @@ -156,12 +177,7 @@ describe("createFollowupRunner messaging tool dedupe", () => { meta: {}, }); - const runner = createFollowupRunner({ - opts: { onBlockReply }, - typing: createMockTypingController(), - typingMode: "instant", - defaultModel: "anthropic/claude-opus-4-5", - }); + const runner = createMessagingDedupeRunner(onBlockReply); await runner(baseQueuedRun()); @@ -176,12 +192,7 @@ describe("createFollowupRunner messaging tool dedupe", () => { meta: {}, }); - const runner = createFollowupRunner({ - opts: { onBlockReply }, - typing: createMockTypingController(), - typingMode: "instant", - defaultModel: "anthropic/claude-opus-4-5", - }); + const runner = createMessagingDedupeRunner(onBlockReply); await runner(baseQueuedRun()); @@ -197,12 +208,7 @@ describe("createFollowupRunner messaging tool dedupe", () => { meta: {}, }); - const runner = createFollowupRunner({ - opts: { onBlockReply }, - typing: createMockTypingController(), - typingMode: "instant", - defaultModel: "anthropic/claude-opus-4-5", - }); + const runner = createMessagingDedupeRunner(onBlockReply); await runner(baseQueuedRun("slack")); @@ -217,12 +223,7 @@ describe("createFollowupRunner messaging tool dedupe", () => { meta: {}, }); - const runner = createFollowupRunner({ - opts: { onBlockReply }, - typing: createMockTypingController(), - typingMode: "instant", - defaultModel: "anthropic/claude-opus-4-5", - }); + const runner = createMessagingDedupeRunner(onBlockReply); await runner(baseQueuedRun()); @@ -238,12 +239,7 @@ describe("createFollowupRunner messaging tool dedupe", () => { meta: {}, }); - const runner = createFollowupRunner({ - opts: { onBlockReply }, - typing: createMockTypingController(), - typingMode: "instant", - defaultModel: "anthropic/claude-opus-4-5", - }); + const runner = createMessagingDedupeRunner(onBlockReply); await runner(baseQueuedRun()); @@ -275,15 +271,11 @@ describe("createFollowupRunner messaging tool dedupe", () => { }, }); - const runner = createFollowupRunner({ - opts: { onBlockReply }, - typing: createMockTypingController(), - typingMode: "instant", + const runner = createMessagingDedupeRunner(onBlockReply, { sessionEntry, sessionStore, sessionKey, storePath, - defaultModel: "anthropic/claude-opus-4-5", }); await runner(baseQueuedRun("slack")); diff --git a/src/auto-reply/reply/queue/enqueue.ts b/src/auto-reply/reply/queue/enqueue.ts index f5444c0a9..09e848dc0 100644 --- a/src/auto-reply/reply/queue/enqueue.ts +++ b/src/auto-reply/reply/queue/enqueue.ts @@ -1,5 +1,5 @@ import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js"; -import { FOLLOWUP_QUEUES, getFollowupQueue } from "./state.js"; +import { getExistingFollowupQueue, getFollowupQueue } from "./state.js"; import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js"; function isRunAlreadyQueued( @@ -57,11 +57,7 @@ export function enqueueFollowupRun( } export function getFollowupQueueDepth(key: string): number { - const cleaned = key.trim(); - if (!cleaned) { - return 0; - } - const queue = FOLLOWUP_QUEUES.get(cleaned); + const queue = getExistingFollowupQueue(key); if (!queue) { return 0; } diff --git a/src/auto-reply/reply/queue/state.ts b/src/auto-reply/reply/queue/state.ts index 6f135d98a..73f7ed946 100644 --- a/src/auto-reply/reply/queue/state.ts +++ b/src/auto-reply/reply/queue/state.ts @@ -20,6 +20,14 @@ export const DEFAULT_QUEUE_DROP: QueueDropPolicy = "summarize"; export const FOLLOWUP_QUEUES = new Map(); +export function getExistingFollowupQueue(key: string): FollowupQueueState | undefined { + const cleaned = key.trim(); + if (!cleaned) { + return undefined; + } + return FOLLOWUP_QUEUES.get(cleaned); +} + export function getFollowupQueue(key: string, settings: QueueSettings): FollowupQueueState { const existing = FOLLOWUP_QUEUES.get(key); if (existing) { @@ -57,10 +65,7 @@ export function getFollowupQueue(key: string, settings: QueueSettings): Followup export function clearFollowupQueue(key: string): number { const cleaned = key.trim(); - if (!cleaned) { - return 0; - } - const queue = FOLLOWUP_QUEUES.get(cleaned); + const queue = getExistingFollowupQueue(cleaned); if (!queue) { return 0; } diff --git a/src/config/runtime-group-policy-provider.ts b/src/config/runtime-group-policy-provider.ts new file mode 100644 index 000000000..887f35c3a --- /dev/null +++ b/src/config/runtime-group-policy-provider.ts @@ -0,0 +1,19 @@ +import { resolveRuntimeGroupPolicy } from "./runtime-group-policy.js"; +import type { GroupPolicy } from "./types.base.js"; + +export function resolveProviderRuntimeGroupPolicy(params: { + providerConfigPresent: boolean; + groupPolicy?: GroupPolicy; + defaultGroupPolicy?: GroupPolicy; +}): { + groupPolicy: GroupPolicy; + providerMissingFallbackApplied: boolean; +} { + return resolveRuntimeGroupPolicy({ + providerConfigPresent: params.providerConfigPresent, + groupPolicy: params.groupPolicy, + defaultGroupPolicy: params.defaultGroupPolicy, + configuredFallbackPolicy: "open", + missingProviderFallbackPolicy: "allowlist", + }); +} diff --git a/src/gateway/protocol/schema/exec-approvals.ts b/src/gateway/protocol/schema/exec-approvals.ts index 05c2e0376..28baa3357 100644 --- a/src/gateway/protocol/schema/exec-approvals.ts +++ b/src/gateway/protocol/schema/exec-approvals.ts @@ -12,22 +12,20 @@ export const ExecApprovalsAllowlistEntrySchema = Type.Object( { additionalProperties: false }, ); -export const ExecApprovalsDefaultsSchema = Type.Object( - { - security: Type.Optional(Type.String()), - ask: Type.Optional(Type.String()), - askFallback: Type.Optional(Type.String()), - autoAllowSkills: Type.Optional(Type.Boolean()), - }, - { additionalProperties: false }, -); +const ExecApprovalsPolicyFields = { + security: Type.Optional(Type.String()), + ask: Type.Optional(Type.String()), + askFallback: Type.Optional(Type.String()), + autoAllowSkills: Type.Optional(Type.Boolean()), +}; + +export const ExecApprovalsDefaultsSchema = Type.Object(ExecApprovalsPolicyFields, { + additionalProperties: false, +}); export const ExecApprovalsAgentSchema = Type.Object( { - security: Type.Optional(Type.String()), - ask: Type.Optional(Type.String()), - askFallback: Type.Optional(Type.String()), - autoAllowSkills: Type.Optional(Type.Boolean()), + ...ExecApprovalsPolicyFields, allowlist: Type.Optional(Type.Array(ExecApprovalsAllowlistEntrySchema)), }, { additionalProperties: false }, diff --git a/src/gateway/server/ws-connection/message-handler.ts b/src/gateway/server/ws-connection/message-handler.ts index 54ee8df36..19b870975 100644 --- a/src/gateway/server/ws-connection/message-handler.ts +++ b/src/gateway/server/ws-connection/message-handler.ts @@ -622,19 +622,22 @@ export function attachGatewayWsMessageHandler(params: { `security audit: device access upgrade requested reason=${reason} device=${device.id} ip=${reportedClientIp ?? "unknown-ip"} auth=${authMethod} roleFrom=${formatAuditList(currentRoles)} roleTo=${role} scopesFrom=${formatAuditList(currentScopes)} scopesTo=${formatAuditList(scopes)} client=${connectParams.client.id} conn=${connId}`, ); }; + const clientAccessMetadata = { + displayName: connectParams.client.displayName, + platform: connectParams.client.platform, + clientId: connectParams.client.id, + clientMode: connectParams.client.mode, + role, + scopes, + remoteIp: reportedClientIp, + }; const requirePairing = async ( reason: "not-paired" | "role-upgrade" | "scope-upgrade", ) => { const pairing = await requestDevicePairing({ deviceId: device.id, publicKey: devicePublicKey, - displayName: connectParams.client.displayName, - platform: connectParams.client.platform, - clientId: connectParams.client.id, - clientMode: connectParams.client.mode, - role, - scopes, - remoteIp: reportedClientIp, + ...clientAccessMetadata, silent: isLocalClient && reason === "not-paired", }); const context = buildRequestContext(); @@ -735,15 +738,7 @@ export function attachGatewayWsMessageHandler(params: { } } - await updatePairedDeviceMetadata(device.id, { - displayName: connectParams.client.displayName, - platform: connectParams.client.platform, - clientId: connectParams.client.id, - clientMode: connectParams.client.mode, - role, - scopes, - remoteIp: reportedClientIp, - }); + await updatePairedDeviceMetadata(device.id, clientAccessMetadata); } } diff --git a/src/hooks/bundled/boot-md/handler.test.ts b/src/hooks/bundled/boot-md/handler.test.ts index bb0e76767..b212842fb 100644 --- a/src/hooks/bundled/boot-md/handler.test.ts +++ b/src/hooks/bundled/boot-md/handler.test.ts @@ -46,6 +46,12 @@ describe("boot-md handler", () => { return cfg; } + function setupSingleMainAgentBootConfig(cfg: unknown) { + listAgentIds.mockReturnValue(["main"]); + resolveAgentWorkspaceDir.mockReturnValue(MAIN_WORKSPACE_DIR); + return cfg; + } + beforeEach(() => { vi.clearAllMocks(); }); @@ -82,9 +88,7 @@ describe("boot-md handler", () => { }); it("runs boot for single default agent when no agents configured", async () => { - const cfg = {}; - listAgentIds.mockReturnValue(["main"]); - resolveAgentWorkspaceDir.mockReturnValue(MAIN_WORKSPACE_DIR); + const cfg = setupSingleMainAgentBootConfig({}); runBootOnce.mockResolvedValue({ status: "skipped", reason: "missing" }); await runBootChecklist(makeEvent({ context: { cfg } })); @@ -112,9 +116,7 @@ describe("boot-md handler", () => { }); it("logs debug details when a per-agent boot run is skipped", async () => { - const cfg = { agents: { list: [{ id: "main" }] } }; - listAgentIds.mockReturnValue(["main"]); - resolveAgentWorkspaceDir.mockReturnValue(MAIN_WORKSPACE_DIR); + const cfg = setupSingleMainAgentBootConfig({ agents: { list: [{ id: "main" }] } }); runBootOnce.mockResolvedValue({ status: "skipped", reason: "missing" }); await runBootChecklist(makeEvent({ context: { cfg } })); diff --git a/src/hooks/bundled/session-memory/handler.test.ts b/src/hooks/bundled/session-memory/handler.test.ts index 1d7aa63ba..0b2b10eb0 100644 --- a/src/hooks/bundled/session-memory/handler.test.ts +++ b/src/hooks/bundled/session-memory/handler.test.ts @@ -133,6 +133,33 @@ async function createSessionMemoryWorkspace(params?: { return { tempDir, sessionsDir, activeSessionFile }; } +async function loadMemoryFromActiveSessionPointer(params: { + tempDir: string; + activeSessionFile: string; +}): Promise { + const { memoryContent } = await runNewWithPreviousSessionEntry({ + tempDir: params.tempDir, + previousSessionEntry: { + sessionId: "test-123", + sessionFile: params.activeSessionFile, + }, + }); + return memoryContent; +} + +function expectMemoryConversation(params: { + memoryContent: string; + user: string; + assistant: string; + absent?: string; +}) { + expect(params.memoryContent).toContain(`user: ${params.user}`); + expect(params.memoryContent).toContain(`assistant: ${params.assistant}`); + if (params.absent) { + expect(params.memoryContent).not.toContain(params.absent); + } +} + describe("session-memory hook", () => { it("skips non-command events", async () => { const tempDir = await makeTempWorkspace("openclaw-session-memory-"); @@ -415,17 +442,17 @@ describe("session-memory hook", () => { ]), }); - const { memoryContent } = await runNewWithPreviousSessionEntry({ + const memoryContent = await loadMemoryFromActiveSessionPointer({ tempDir, - previousSessionEntry: { - sessionId: "test-123", - sessionFile: activeSessionFile!, - }, + activeSessionFile: activeSessionFile!, }); - expect(memoryContent).toContain("user: Newest rotated transcript"); - expect(memoryContent).toContain("assistant: Newest summary"); - expect(memoryContent).not.toContain("Older rotated transcript"); + expectMemoryConversation({ + memoryContent, + user: "Newest rotated transcript", + assistant: "Newest summary", + absent: "Older rotated transcript", + }); }); it("prefers active transcript when it is non-empty even with reset candidates", async () => { @@ -448,17 +475,17 @@ describe("session-memory hook", () => { ]), }); - const { memoryContent } = await runNewWithPreviousSessionEntry({ + const memoryContent = await loadMemoryFromActiveSessionPointer({ tempDir, - previousSessionEntry: { - sessionId: "test-123", - sessionFile: activeSessionFile!, - }, + activeSessionFile: activeSessionFile!, }); - expect(memoryContent).toContain("user: Active transcript message"); - expect(memoryContent).toContain("assistant: Active transcript summary"); - expect(memoryContent).not.toContain("Reset fallback message"); + expectMemoryConversation({ + memoryContent, + user: "Active transcript message", + assistant: "Active transcript summary", + absent: "Reset fallback message", + }); }); it("handles empty session files gracefully", async () => { diff --git a/src/hooks/loader.test.ts b/src/hooks/loader.test.ts index 419884e39..66ccf04b8 100644 --- a/src/hooks/loader.test.ts +++ b/src/hooks/loader.test.ts @@ -33,6 +33,25 @@ describe("loader", () => { process.env.OPENCLAW_BUNDLED_HOOKS_DIR = "/nonexistent/bundled/hooks"; }); + async function writeHandlerModule( + fileName: string, + code = "export default async function() {}", + ): Promise { + const handlerPath = path.join(tmpDir, fileName); + await fs.writeFile(handlerPath, code, "utf-8"); + return handlerPath; + } + + function createEnabledHooksConfig( + handlers?: Array<{ event: string; module: string; export?: string }>, + ): OpenClawConfig { + return { + hooks: { + internal: handlers ? { enabled: true, handlers } : { enabled: true }, + }, + }; + } + afterEach(async () => { clearInternalHooks(); envSnapshot.restore(); @@ -67,27 +86,18 @@ describe("loader", () => { it("should load a handler from a module", async () => { // Create a test handler module - const handlerPath = path.join(tmpDir, "test-handler.js"); const handlerCode = ` export default async function(event) { // Test handler } `; - await fs.writeFile(handlerPath, handlerCode, "utf-8"); - - const cfg: OpenClawConfig = { - hooks: { - internal: { - enabled: true, - handlers: [ - { - event: "command:new", - module: path.basename(handlerPath), - }, - ], - }, + const handlerPath = await writeHandlerModule("test-handler.js", handlerCode); + const cfg = createEnabledHooksConfig([ + { + event: "command:new", + module: path.basename(handlerPath), }, - }; + ]); const count = await loadInternalHooks(cfg, tmpDir); expect(count).toBe(1); @@ -98,23 +108,13 @@ describe("loader", () => { it("should load multiple handlers", async () => { // Create test handler modules - const handler1Path = path.join(tmpDir, "handler1.js"); - const handler2Path = path.join(tmpDir, "handler2.js"); + const handler1Path = await writeHandlerModule("handler1.js"); + const handler2Path = await writeHandlerModule("handler2.js"); - await fs.writeFile(handler1Path, "export default async function() {}", "utf-8"); - await fs.writeFile(handler2Path, "export default async function() {}", "utf-8"); - - const cfg: OpenClawConfig = { - hooks: { - internal: { - enabled: true, - handlers: [ - { event: "command:new", module: path.basename(handler1Path) }, - { event: "command:stop", module: path.basename(handler2Path) }, - ], - }, - }, - }; + const cfg = createEnabledHooksConfig([ + { event: "command:new", module: path.basename(handler1Path) }, + { event: "command:stop", module: path.basename(handler2Path) }, + ]); const count = await loadInternalHooks(cfg, tmpDir); expect(count).toBe(2); @@ -126,47 +126,32 @@ describe("loader", () => { it("should support named exports", async () => { // Create a handler module with named export - const handlerPath = path.join(tmpDir, "named-export.js"); const handlerCode = ` export const myHandler = async function(event) { // Named export handler } `; - await fs.writeFile(handlerPath, handlerCode, "utf-8"); + const handlerPath = await writeHandlerModule("named-export.js", handlerCode); - const cfg: OpenClawConfig = { - hooks: { - internal: { - enabled: true, - handlers: [ - { - event: "command:new", - module: path.basename(handlerPath), - export: "myHandler", - }, - ], - }, + const cfg = createEnabledHooksConfig([ + { + event: "command:new", + module: path.basename(handlerPath), + export: "myHandler", }, - }; + ]); const count = await loadInternalHooks(cfg, tmpDir); expect(count).toBe(1); }); it("should handle module loading errors gracefully", async () => { - const cfg: OpenClawConfig = { - hooks: { - internal: { - enabled: true, - handlers: [ - { - event: "command:new", - module: "missing-handler.js", - }, - ], - }, + const cfg = createEnabledHooksConfig([ + { + event: "command:new", + module: "missing-handler.js", }, - }; + ]); // Should not throw and should return 0 (handler failed to load) const count = await loadInternalHooks(cfg, tmpDir); @@ -175,22 +160,17 @@ describe("loader", () => { it("should handle non-function exports", async () => { // Create a module with a non-function export - const handlerPath = path.join(tmpDir, "bad-export.js"); - await fs.writeFile(handlerPath, 'export default "not a function";', "utf-8"); + const handlerPath = await writeHandlerModule( + "bad-export.js", + 'export default "not a function";', + ); - const cfg: OpenClawConfig = { - hooks: { - internal: { - enabled: true, - handlers: [ - { - event: "command:new", - module: path.basename(handlerPath), - }, - ], - }, + const cfg = createEnabledHooksConfig([ + { + event: "command:new", + module: path.basename(handlerPath), }, - }; + ]); // Should not throw and should return 0 (handler is not a function) const count = await loadInternalHooks(cfg, tmpDir); @@ -199,25 +179,17 @@ describe("loader", () => { it("should handle relative paths", async () => { // Create a handler module - const handlerPath = path.join(tmpDir, "relative-handler.js"); - await fs.writeFile(handlerPath, "export default async function() {}", "utf-8"); + const handlerPath = await writeHandlerModule("relative-handler.js"); // Relative to workspaceDir (tmpDir) const relativePath = path.relative(tmpDir, handlerPath); - const cfg: OpenClawConfig = { - hooks: { - internal: { - enabled: true, - handlers: [ - { - event: "command:new", - module: relativePath, - }, - ], - }, + const cfg = createEnabledHooksConfig([ + { + event: "command:new", + module: relativePath, }, - }; + ]); const count = await loadInternalHooks(cfg, tmpDir); expect(count).toBe(1); @@ -225,7 +197,6 @@ describe("loader", () => { it("should actually call the loaded handler", async () => { // Create a handler that we can verify was called - const handlerPath = path.join(tmpDir, "callable-handler.js"); const handlerCode = ` let callCount = 0; export default async function(event) { @@ -235,21 +206,14 @@ describe("loader", () => { return callCount; } `; - await fs.writeFile(handlerPath, handlerCode, "utf-8"); + const handlerPath = await writeHandlerModule("callable-handler.js", handlerCode); - const cfg: OpenClawConfig = { - hooks: { - internal: { - enabled: true, - handlers: [ - { - event: "command:new", - module: path.basename(handlerPath), - }, - ], - }, + const cfg = createEnabledHooksConfig([ + { + event: "command:new", + module: path.basename(handlerPath), }, - }; + ]); await loadInternalHooks(cfg, tmpDir); @@ -288,13 +252,7 @@ describe("loader", () => { return; } - const cfg: OpenClawConfig = { - hooks: { - internal: { - enabled: true, - }, - }, - }; + const cfg = createEnabledHooksConfig(); const count = await loadInternalHooks(cfg, tmpDir); expect(count).toBe(0); @@ -312,19 +270,12 @@ describe("loader", () => { return; } - const cfg: OpenClawConfig = { - hooks: { - internal: { - enabled: true, - handlers: [ - { - event: "command:new", - module: "legacy-handler.js", - }, - ], - }, + const cfg = createEnabledHooksConfig([ + { + event: "command:new", + module: "legacy-handler.js", }, - }; + ]); const count = await loadInternalHooks(cfg, tmpDir); expect(count).toBe(0); diff --git a/src/media-understanding/providers/deepgram/audio.ts b/src/media-understanding/providers/deepgram/audio.ts index b32f18c87..25cb4045d 100644 --- a/src/media-understanding/providers/deepgram/audio.ts +++ b/src/media-understanding/providers/deepgram/audio.ts @@ -1,5 +1,10 @@ import type { AudioTranscriptionRequest, AudioTranscriptionResult } from "../../types.js"; -import { assertOkOrThrowHttpError, fetchWithTimeoutGuarded, normalizeBaseUrl } from "../shared.js"; +import { + assertOkOrThrowHttpError, + normalizeBaseUrl, + postTranscriptionRequest, + requireTranscriptionText, +} from "../shared.js"; export const DEFAULT_DEEPGRAM_AUDIO_BASE_URL = "https://api.deepgram.com/v1"; export const DEFAULT_DEEPGRAM_AUDIO_MODEL = "nova-3"; @@ -50,26 +55,23 @@ export async function transcribeDeepgramAudio( } const body = new Uint8Array(params.buffer); - const { response: res, release } = await fetchWithTimeoutGuarded( - url.toString(), - { - method: "POST", - headers, - body, - }, - params.timeoutMs, + const { response: res, release } = await postTranscriptionRequest({ + url: url.toString(), + headers, + body, + timeoutMs: params.timeoutMs, fetchFn, - allowPrivate ? { ssrfPolicy: { allowPrivateNetwork: true } } : undefined, - ); + allowPrivateNetwork: allowPrivate, + }); try { await assertOkOrThrowHttpError(res, "Audio transcription failed"); const payload = (await res.json()) as DeepgramTranscriptResponse; - const transcript = payload.results?.channels?.[0]?.alternatives?.[0]?.transcript?.trim(); - if (!transcript) { - throw new Error("Audio transcription response missing transcript"); - } + const transcript = requireTranscriptionText( + payload.results?.channels?.[0]?.alternatives?.[0]?.transcript, + "Audio transcription response missing transcript", + ); return { text: transcript, model }; } finally { await release(); diff --git a/src/media-understanding/providers/google/video.test.ts b/src/media-understanding/providers/google/video.test.ts index fb2682d81..772d01e2d 100644 --- a/src/media-understanding/providers/google/video.test.ts +++ b/src/media-understanding/providers/google/video.test.ts @@ -1,20 +1,11 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import * as ssrf from "../../../infra/net/ssrf.js"; import { withFetchPreconnect } from "../../../test-utils/fetch-mock.js"; +import { createRequestCaptureJsonFetch } from "../audio.test-helpers.js"; import { describeGeminiVideo } from "./video.js"; const TEST_NET_IP = "203.0.113.10"; -const resolveRequestUrl = (input: RequestInfo | URL) => { - if (typeof input === "string") { - return input; - } - if (input instanceof URL) { - return input.toString(); - } - return input.url; -}; - function stubPinnedHostname(hostname: string) { const normalized = hostname.trim().toLowerCase().replace(/\.$/, ""); const addresses = [TEST_NET_IP]; @@ -73,23 +64,14 @@ describe("describeGeminiVideo", () => { }); it("builds the expected request payload", async () => { - let seenUrl: string | null = null; - let seenInit: RequestInit | undefined; - const fetchFn = withFetchPreconnect(async (input: RequestInfo | URL, init?: RequestInit) => { - seenUrl = resolveRequestUrl(input); - seenInit = init; - return new Response( - JSON.stringify({ - candidates: [ - { - content: { - parts: [{ text: "first" }, { text: " second " }, { text: "" }], - }, - }, - ], - }), - { status: 200, headers: { "content-type": "application/json" } }, - ); + const { fetchFn, getRequest } = createRequestCaptureJsonFetch({ + candidates: [ + { + content: { + parts: [{ text: "first" }, { text: " second " }, { text: "" }], + }, + }, + ], }); const result = await describeGeminiVideo({ @@ -102,6 +84,7 @@ describe("describeGeminiVideo", () => { headers: { "X-Other": "1" }, fetchFn, }); + const { url: seenUrl, init: seenInit } = getRequest(); expect(result.model).toBe("gemini-3-pro-preview"); expect(result.text).toBe("first\nsecond"); diff --git a/src/media-understanding/providers/openai/audio.ts b/src/media-understanding/providers/openai/audio.ts index 2635ad23d..26db4b0c2 100644 --- a/src/media-understanding/providers/openai/audio.ts +++ b/src/media-understanding/providers/openai/audio.ts @@ -1,6 +1,11 @@ import path from "node:path"; import type { AudioTranscriptionRequest, AudioTranscriptionResult } from "../../types.js"; -import { assertOkOrThrowHttpError, fetchWithTimeoutGuarded, normalizeBaseUrl } from "../shared.js"; +import { + assertOkOrThrowHttpError, + normalizeBaseUrl, + postTranscriptionRequest, + requireTranscriptionText, +} from "../shared.js"; export const DEFAULT_OPENAI_AUDIO_BASE_URL = "https://api.openai.com/v1"; const DEFAULT_OPENAI_AUDIO_MODEL = "gpt-4o-mini-transcribe"; @@ -39,26 +44,23 @@ export async function transcribeOpenAiCompatibleAudio( headers.set("authorization", `Bearer ${params.apiKey}`); } - const { response: res, release } = await fetchWithTimeoutGuarded( + const { response: res, release } = await postTranscriptionRequest({ url, - { - method: "POST", - headers, - body: form, - }, - params.timeoutMs, + headers, + body: form, + timeoutMs: params.timeoutMs, fetchFn, - allowPrivate ? { ssrfPolicy: { allowPrivateNetwork: true } } : undefined, - ); + allowPrivateNetwork: allowPrivate, + }); try { await assertOkOrThrowHttpError(res, "Audio transcription failed"); const payload = (await res.json()) as { text?: string }; - const text = payload.text?.trim(); - if (!text) { - throw new Error("Audio transcription response missing text"); - } + const text = requireTranscriptionText( + payload.text, + "Audio transcription response missing text", + ); return { text, model }; } finally { await release(); diff --git a/src/media-understanding/providers/shared.ts b/src/media-understanding/providers/shared.ts index 1fac7ba5b..96145b2e7 100644 --- a/src/media-understanding/providers/shared.ts +++ b/src/media-understanding/providers/shared.ts @@ -32,6 +32,27 @@ export async function fetchWithTimeoutGuarded( }); } +export async function postTranscriptionRequest(params: { + url: string; + headers: Headers; + body: BodyInit; + timeoutMs: number; + fetchFn: typeof fetch; + allowPrivateNetwork?: boolean; +}) { + return fetchWithTimeoutGuarded( + params.url, + { + method: "POST", + headers: params.headers, + body: params.body, + }, + params.timeoutMs, + params.fetchFn, + params.allowPrivateNetwork ? { ssrfPolicy: { allowPrivateNetwork: true } } : undefined, + ); +} + export async function readErrorResponse(res: Response): Promise { try { const text = await res.text(); @@ -56,3 +77,14 @@ export async function assertOkOrThrowHttpError(res: Response, label: string): Pr const suffix = detail ? `: ${detail}` : ""; throw new Error(`${label} (HTTP ${res.status})${suffix}`); } + +export function requireTranscriptionText( + value: string | undefined, + missingMessage: string, +): string { + const text = value?.trim(); + if (!text) { + throw new Error(missingMessage); + } + return text; +} diff --git a/src/process/supervisor/adapters/child.test.ts b/src/process/supervisor/adapters/child.test.ts index 780b32f4b..9c46bdd0c 100644 --- a/src/process/supervisor/adapters/child.test.ts +++ b/src/process/supervisor/adapters/child.test.ts @@ -9,11 +9,11 @@ const { spawnWithFallbackMock, killProcessTreeMock } = vi.hoisted(() => ({ })); vi.mock("../../spawn-utils.js", () => ({ - spawnWithFallback: (...args: unknown[]) => spawnWithFallbackMock(...args), + spawnWithFallback: spawnWithFallbackMock, })); vi.mock("../../kill-tree.js", () => ({ - killProcessTree: (...args: unknown[]) => killProcessTreeMock(...args), + killProcessTree: killProcessTreeMock, })); let createChildAdapter: typeof import("./child.js").createChildAdapter; diff --git a/src/process/supervisor/adapters/pty.test.ts b/src/process/supervisor/adapters/pty.test.ts index 07df965be..32ca418b5 100644 --- a/src/process/supervisor/adapters/pty.test.ts +++ b/src/process/supervisor/adapters/pty.test.ts @@ -31,6 +31,11 @@ function createStubPty(pid = 1234) { }; } +function expectSpawnEnv() { + const spawnOptions = spawnMock.mock.calls[0]?.[2] as { env?: Record }; + return spawnOptions?.env; +} + describe("createPtyAdapter", () => { let createPtyAdapter: typeof import("./pty.js").createPtyAdapter; @@ -152,8 +157,7 @@ describe("createPtyAdapter", () => { args: ["-lc", "env"], }); - const spawnOptions = spawnMock.mock.calls[0]?.[2] as { env?: Record }; - expect(spawnOptions?.env).toBeUndefined(); + expect(expectSpawnEnv()).toBeUndefined(); }); it("passes explicit env overrides as strings", async () => { @@ -166,8 +170,7 @@ describe("createPtyAdapter", () => { env: { FOO: "bar", COUNT: "12", DROP_ME: undefined }, }); - const spawnOptions = spawnMock.mock.calls[0]?.[2] as { env?: Record }; - expect(spawnOptions?.env).toEqual({ FOO: "bar", COUNT: "12" }); + expect(expectSpawnEnv()).toEqual({ FOO: "bar", COUNT: "12" }); }); it("does not pass a signal to node-pty on Windows", async () => { diff --git a/src/process/supervisor/registry.test.ts b/src/process/supervisor/registry.test.ts index 64d56d33d..27206374a 100644 --- a/src/process/supervisor/registry.test.ts +++ b/src/process/supervisor/registry.test.ts @@ -1,19 +1,35 @@ import { describe, expect, it } from "vitest"; import { createRunRegistry } from "./registry.js"; +type RunRegistry = ReturnType; + +function addRunningRecord( + registry: RunRegistry, + params: { + runId: string; + sessionId: string; + startedAtMs: number; + scopeKey?: string; + backendId?: string; + }, +) { + registry.add({ + runId: params.runId, + sessionId: params.sessionId, + backendId: params.backendId ?? "b1", + scopeKey: params.scopeKey, + state: "running", + startedAtMs: params.startedAtMs, + lastOutputAtMs: params.startedAtMs, + createdAtMs: params.startedAtMs, + updatedAtMs: params.startedAtMs, + }); +} + describe("process supervisor run registry", () => { it("finalize is idempotent and preserves first terminal metadata", () => { const registry = createRunRegistry(); - registry.add({ - runId: "r1", - sessionId: "s1", - backendId: "b1", - state: "running", - startedAtMs: 1, - lastOutputAtMs: 1, - createdAtMs: 1, - updatedAtMs: 1, - }); + addRunningRecord(registry, { runId: "r1", sessionId: "s1", startedAtMs: 1 }); const first = registry.finalize("r1", { reason: "overall-timeout", @@ -41,36 +57,9 @@ describe("process supervisor run registry", () => { it("prunes oldest exited records once retention cap is exceeded", () => { const registry = createRunRegistry({ maxExitedRecords: 2 }); - registry.add({ - runId: "r1", - sessionId: "s1", - backendId: "b1", - state: "running", - startedAtMs: 1, - lastOutputAtMs: 1, - createdAtMs: 1, - updatedAtMs: 1, - }); - registry.add({ - runId: "r2", - sessionId: "s2", - backendId: "b1", - state: "running", - startedAtMs: 2, - lastOutputAtMs: 2, - createdAtMs: 2, - updatedAtMs: 2, - }); - registry.add({ - runId: "r3", - sessionId: "s3", - backendId: "b1", - state: "running", - startedAtMs: 3, - lastOutputAtMs: 3, - createdAtMs: 3, - updatedAtMs: 3, - }); + addRunningRecord(registry, { runId: "r1", sessionId: "s1", startedAtMs: 1 }); + addRunningRecord(registry, { runId: "r2", sessionId: "s2", startedAtMs: 2 }); + addRunningRecord(registry, { runId: "r3", sessionId: "s3", startedAtMs: 3 }); registry.finalize("r1", { reason: "exit", exitCode: 0, exitSignal: null }); registry.finalize("r2", { reason: "exit", exitCode: 0, exitSignal: null }); @@ -80,4 +69,32 @@ describe("process supervisor run registry", () => { expect(registry.get("r2")?.state).toBe("exited"); expect(registry.get("r3")?.state).toBe("exited"); }); + + it("filters listByScope and returns detached copies", () => { + const registry = createRunRegistry(); + addRunningRecord(registry, { + runId: "r1", + sessionId: "s1", + scopeKey: "scope:a", + startedAtMs: 1, + }); + addRunningRecord(registry, { + runId: "r2", + sessionId: "s2", + scopeKey: "scope:b", + startedAtMs: 2, + }); + + expect(registry.listByScope(" ")).toEqual([]); + const scoped = registry.listByScope("scope:a"); + expect(scoped).toHaveLength(1); + const [firstScoped] = scoped; + expect(firstScoped?.runId).toBe("r1"); + + if (!firstScoped) { + throw new Error("missing scoped record"); + } + firstScoped.state = "exited"; + expect(registry.get("r1")?.state).toBe("running"); + }); }); diff --git a/src/process/supervisor/supervisor.test.ts b/src/process/supervisor/supervisor.test.ts index 0e4e56a64..71d7be893 100644 --- a/src/process/supervisor/supervisor.test.ts +++ b/src/process/supervisor/supervisor.test.ts @@ -1,13 +1,23 @@ import { describe, expect, it } from "vitest"; import { createProcessSupervisor } from "./supervisor.js"; +type ProcessSupervisor = ReturnType; +type SpawnOptions = Parameters[0]; +type ChildSpawnOptions = Omit, "backendId" | "mode">; + +async function spawnChild(supervisor: ProcessSupervisor, options: ChildSpawnOptions) { + return supervisor.spawn({ + ...options, + backendId: "test", + mode: "child", + }); +} + describe("process supervisor", () => { it("spawns child runs and captures output", async () => { const supervisor = createProcessSupervisor(); - const run = await supervisor.spawn({ + const run = await spawnChild(supervisor, { sessionId: "s1", - backendId: "test", - mode: "child", argv: [process.execPath, "-e", 'process.stdout.write("ok")'], timeoutMs: 2_500, stdinMode: "pipe-closed", @@ -20,10 +30,8 @@ describe("process supervisor", () => { it("enforces no-output timeout for silent processes", async () => { const supervisor = createProcessSupervisor(); - const run = await supervisor.spawn({ + const run = await spawnChild(supervisor, { sessionId: "s1", - backendId: "test", - mode: "child", argv: [process.execPath, "-e", "setTimeout(() => {}, 60)"], timeoutMs: 1_000, noOutputTimeoutMs: 20, @@ -37,22 +45,18 @@ describe("process supervisor", () => { it("cancels prior scoped run when replaceExistingScope is enabled", async () => { const supervisor = createProcessSupervisor(); - const first = await supervisor.spawn({ + const first = await spawnChild(supervisor, { sessionId: "s1", - backendId: "test", scopeKey: "scope:a", - mode: "child", argv: [process.execPath, "-e", "setTimeout(() => {}, 60)"], timeoutMs: 1_000, stdinMode: "pipe-open", }); - const second = await supervisor.spawn({ + const second = await spawnChild(supervisor, { sessionId: "s1", - backendId: "test", scopeKey: "scope:a", replaceExistingScope: true, - mode: "child", argv: [process.execPath, "-e", 'process.stdout.write("new")'], timeoutMs: 2_500, stdinMode: "pipe-closed", @@ -67,10 +71,8 @@ describe("process supervisor", () => { it("applies overall timeout even for near-immediate timer firing", async () => { const supervisor = createProcessSupervisor(); - const run = await supervisor.spawn({ + const run = await spawnChild(supervisor, { sessionId: "s-timeout", - backendId: "test", - mode: "child", argv: [process.execPath, "-e", "setTimeout(() => {}, 60)"], timeoutMs: 1, stdinMode: "pipe-closed", @@ -83,10 +85,8 @@ describe("process supervisor", () => { it("can stream output without retaining it in RunExit payload", async () => { const supervisor = createProcessSupervisor(); let streamed = ""; - const run = await supervisor.spawn({ + const run = await spawnChild(supervisor, { sessionId: "s-capture", - backendId: "test", - mode: "child", argv: [process.execPath, "-e", 'process.stdout.write("streamed")'], timeoutMs: 2_500, stdinMode: "pipe-closed", diff --git a/src/slack/monitor/events/interactions.ts b/src/slack/monitor/events/interactions.ts index 094c57a9b..cbc4fc9f3 100644 --- a/src/slack/monitor/events/interactions.ts +++ b/src/slack/monitor/events/interactions.ts @@ -99,6 +99,11 @@ type SlackModalEventBase = { }; type SlackModalInteractionKind = "view_submission" | "view_closed"; +type SlackModalEventHandlerArgs = { ack: () => Promise; body: unknown }; +type RegisterSlackModalHandler = ( + matcher: RegExp, + handler: (args: SlackModalEventHandlerArgs) => Promise, +) => void; function readOptionValues(options: unknown): string[] | undefined { if (!Array.isArray(options)) { @@ -483,6 +488,24 @@ function emitSlackModalLifecycleEvent(params: { }); } +function registerModalLifecycleHandler(params: { + register: RegisterSlackModalHandler; + matcher: RegExp; + ctx: SlackMonitorContext; + interactionType: SlackModalInteractionKind; + contextPrefix: "slack:interaction:view" | "slack:interaction:view-closed"; +}) { + params.register(params.matcher, async ({ ack, body }: SlackModalEventHandlerArgs) => { + await ack(); + emitSlackModalLifecycleEvent({ + ctx: params.ctx, + body: body as SlackModalBody, + interactionType: params.interactionType, + contextPrefix: params.contextPrefix, + }); + }); +} + export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContext }) { const { ctx } = params; if (typeof ctx.app.action !== "function") { @@ -646,27 +669,20 @@ export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContex if (typeof ctx.app.view !== "function") { return; } + const modalMatcher = new RegExp(`^${OPENCLAW_ACTION_PREFIX}`); // Handle OpenClaw modal submissions with callback_ids scoped by our prefix. - ctx.app.view( - new RegExp(`^${OPENCLAW_ACTION_PREFIX}`), - async ({ ack, body }: { ack: () => Promise; body: unknown }) => { - await ack(); - emitSlackModalLifecycleEvent({ - ctx, - body: body as SlackModalBody, - interactionType: "view_submission", - contextPrefix: "slack:interaction:view", - }); - }, - ); + registerModalLifecycleHandler({ + register: (matcher, handler) => ctx.app.view(matcher, handler), + matcher: modalMatcher, + ctx, + interactionType: "view_submission", + contextPrefix: "slack:interaction:view", + }); const viewClosed = ( ctx.app as unknown as { - viewClosed?: ( - matcher: RegExp, - handler: (args: { ack: () => Promise; body: unknown }) => Promise, - ) => void; + viewClosed?: RegisterSlackModalHandler; } ).viewClosed; if (typeof viewClosed !== "function") { @@ -674,16 +690,11 @@ export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContex } // Handle modal close events so agent workflows can react to cancelled forms. - viewClosed( - new RegExp(`^${OPENCLAW_ACTION_PREFIX}`), - async ({ ack, body }: { ack: () => Promise; body: unknown }) => { - await ack(); - emitSlackModalLifecycleEvent({ - ctx, - body: body as SlackModalBody, - interactionType: "view_closed", - contextPrefix: "slack:interaction:view-closed", - }); - }, - ); + registerModalLifecycleHandler({ + register: viewClosed, + matcher: modalMatcher, + ctx, + interactionType: "view_closed", + contextPrefix: "slack:interaction:view-closed", + }); } diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index 922f873d8..6fa51f993 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -292,17 +292,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag hasStreamedMessage = false; } - const replyThreadTs = replyPlan.nextThreadTs(); - await deliverReplies({ - replies: [payload], - target: prepared.replyTarget, - token: ctx.botToken, - accountId: account.accountId, - runtime, - textLimit: ctx.textLimit, - replyThreadTs, - }); - replyPlan.markSent(); + await deliverNormally(payload); }, onError: (err, info) => { runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`)); @@ -362,6 +352,18 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag draftStream.update(trimmed); hasStreamedMessage = true; }; + const onDraftBoundary = + useStreaming || !previewStreamingEnabled + ? undefined + : async () => { + if (hasStreamedMessage) { + draftStream.forceNewMessage(); + hasStreamedMessage = false; + appendRenderedText = ""; + appendSourceText = ""; + statusUpdateCount = 0; + } + }; const { queuedFinal, counts } = await dispatchInboundMessage({ ctx: prepared.ctxPayload, @@ -384,32 +386,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag : async (payload) => { updateDraftFromPartial(payload.text); }, - onAssistantMessageStart: useStreaming - ? undefined - : !previewStreamingEnabled - ? undefined - : async () => { - if (hasStreamedMessage) { - draftStream.forceNewMessage(); - hasStreamedMessage = false; - appendRenderedText = ""; - appendSourceText = ""; - statusUpdateCount = 0; - } - }, - onReasoningEnd: useStreaming - ? undefined - : !previewStreamingEnabled - ? undefined - : async () => { - if (hasStreamedMessage) { - draftStream.forceNewMessage(); - hasStreamedMessage = false; - appendRenderedText = ""; - appendSourceText = ""; - statusUpdateCount = 0; - } - }, + onAssistantMessageStart: onDraftBoundary, + onReasoningEnd: onDraftBoundary, }, }); await draftStream.flush(); diff --git a/src/slack/monitor/message-handler/prepare.test.ts b/src/slack/monitor/message-handler/prepare.test.ts index 836a68f7e..7123ea7f9 100644 --- a/src/slack/monitor/message-handler/prepare.test.ts +++ b/src/slack/monitor/message-handler/prepare.test.ts @@ -168,6 +168,19 @@ describe("slack prepareSlackMessage inbound contract", () => { }; } + function createThreadReplyMessage(overrides: Partial): SlackMessageEvent { + return createSlackMessage({ + channel: "C123", + channel_type: "channel", + thread_ts: "100.000", + ...overrides, + }); + } + + function prepareThreadMessage(ctx: SlackMonitorContext, overrides: Partial) { + return prepareMessageWith(ctx, createThreadAccount(), createThreadReplyMessage(overrides)); + } + it("produces a finalized MsgContext", async () => { const message: SlackMessageEvent = { channel: "D123", @@ -298,17 +311,10 @@ describe("slack prepareSlackMessage inbound contract", () => { }); slackCtx.resolveChannelName = async () => ({ name: "general", type: "channel" }); - const prepared = await prepareMessageWith( - slackCtx, - createThreadAccount(), - createSlackMessage({ - channel: "C123", - channel_type: "channel", - text: "current message", - ts: "101.000", - thread_ts: "100.000", - }), - ); + const prepared = await prepareThreadMessage(slackCtx, { + text: "current message", + ts: "101.000", + }); expect(prepared).toBeTruthy(); expect(prepared!.ctxPayload.IsFirstThreadTurn).toBe(true); @@ -347,17 +353,11 @@ describe("slack prepareSlackMessage inbound contract", () => { slackCtx.resolveUserName = async () => ({ name: "Alice" }); slackCtx.resolveChannelName = async () => ({ name: "general", type: "channel" }); - const prepared = await prepareMessageWith( - slackCtx, - createThreadAccount(), - createSlackMessage({ - channel: "C123", - channel_type: "channel", - text: "reply in old thread", - ts: "201.000", - thread_ts: "200.000", - }), - ); + const prepared = await prepareThreadMessage(slackCtx, { + text: "reply in old thread", + ts: "201.000", + thread_ts: "200.000", + }); expect(prepared).toBeTruthy(); expect(prepared!.ctxPayload.IsFirstThreadTurn).toBeUndefined(); diff --git a/src/slack/monitor/slash.ts b/src/slack/monitor/slash.ts index f73c5bb92..fa3dd66c4 100644 --- a/src/slack/monitor/slash.ts +++ b/src/slack/monitor/slash.ts @@ -147,6 +147,13 @@ function parseSlackCommandArgValue(raw?: string | null): { }; } +function buildSlackArgMenuOptions(choices: EncodedMenuChoice[]) { + return choices.map((choice) => ({ + text: { type: "plain_text", text: choice.label.slice(0, 75) }, + value: choice.value, + })); +} + function buildSlackCommandArgMenuBlocks(params: { title: string; command: string; @@ -185,10 +192,7 @@ function buildSlackCommandArgMenuBlocks(params: { type: "overflow", action_id: SLACK_COMMAND_ARG_ACTION_ID, confirm: buildSlackArgMenuConfirm({ command: params.command, arg: params.arg }), - options: encodedChoices.map((choice) => ({ - text: { type: "plain_text", text: choice.label.slice(0, 75) }, - value: choice.value, - })), + options: buildSlackArgMenuOptions(encodedChoices), }, ], }, @@ -238,10 +242,7 @@ function buildSlackCommandArgMenuBlocks(params: { text: index === 0 ? `Choose ${params.arg}` : `Choose ${params.arg} (${index + 1})`, }, - options: choices.map((choice) => ({ - text: { type: "plain_text", text: choice.label.slice(0, 75) }, - value: choice.value, - })), + options: buildSlackArgMenuOptions(choices), }, ], }), diff --git a/src/telegram/bot/delivery.resolve-media-retry.test.ts b/src/telegram/bot/delivery.resolve-media-retry.test.ts index 0fec410dc..2c54396a8 100644 --- a/src/telegram/bot/delivery.resolve-media-retry.test.ts +++ b/src/telegram/bot/delivery.resolve-media-retry.test.ts @@ -82,6 +82,19 @@ function setupTransientGetFileRetry() { return getFile; } +function createFileTooBigError(): Error { + return new Error("GrammyError: Call to 'getFile' failed! (400: Bad Request: file is too big)"); +} + +async function expectTransientGetFileRetrySuccess() { + const getFile = setupTransientGetFileRetry(); + const promise = resolveMedia(makeCtx("voice", getFile), MAX_MEDIA_BYTES, BOT_TOKEN); + await flushRetryTimers(); + const result = await promise; + expect(getFile).toHaveBeenCalledTimes(2); + return result; +} + async function flushRetryTimers() { await vi.runAllTimersAsync(); } @@ -98,12 +111,7 @@ describe("resolveMedia getFile retry", () => { }); it("retries getFile on transient failure and succeeds on second attempt", async () => { - const getFile = setupTransientGetFileRetry(); - const promise = resolveMedia(makeCtx("voice", getFile), MAX_MEDIA_BYTES, BOT_TOKEN); - await flushRetryTimers(); - const result = await promise; - - expect(getFile).toHaveBeenCalledTimes(2); + const result = await expectTransientGetFileRetrySuccess(); expect(result).toEqual( expect.objectContaining({ path: "/tmp/file_0.oga", placeholder: "" }), ); @@ -135,15 +143,13 @@ describe("resolveMedia getFile retry", () => { }); it("does not retry 'file is too big' error (400 Bad Request) and returns null", async () => { - // Simulate Telegram Bot API error when file exceeds 20MB limit - const fileTooBigError = new Error( - "GrammyError: Call to 'getFile' failed! (400: Bad Request: file is too big)", - ); + // Simulate Telegram Bot API error when file exceeds 20MB limit. + const fileTooBigError = createFileTooBigError(); const getFile = vi.fn().mockRejectedValue(fileTooBigError); const result = await resolveMedia(makeCtx("video", getFile), MAX_MEDIA_BYTES, BOT_TOKEN); - // Should NOT retry - "file is too big" is a permanent error, not transient + // Should NOT retry - "file is too big" is a permanent error, not transient. expect(getFile).toHaveBeenCalledTimes(1); expect(result).toBeNull(); }); @@ -166,10 +172,7 @@ describe("resolveMedia getFile retry", () => { it.each(["audio", "voice"] as const)( "returns null for %s when file is too big", async (mediaField) => { - const fileTooBigError = new Error( - "GrammyError: Call to 'getFile' failed! (400: Bad Request: file is too big)", - ); - const getFile = vi.fn().mockRejectedValue(fileTooBigError); + const getFile = vi.fn().mockRejectedValue(createFileTooBigError()); const result = await resolveMedia(makeCtx(mediaField, getFile), MAX_MEDIA_BYTES, BOT_TOKEN); @@ -178,14 +181,17 @@ describe("resolveMedia getFile retry", () => { }, ); - it("still retries transient errors even after encountering file too big in different call", async () => { - const getFile = setupTransientGetFileRetry(); - const promise = resolveMedia(makeCtx("voice", getFile), MAX_MEDIA_BYTES, BOT_TOKEN); - await flushRetryTimers(); - const result = await promise; + it("throws when getFile returns no file_path", async () => { + const getFile = vi.fn().mockResolvedValue({}); + await expect( + resolveMedia(makeCtx("voice", getFile), MAX_MEDIA_BYTES, BOT_TOKEN), + ).rejects.toThrow("Telegram getFile returned no file_path"); + expect(getFile).toHaveBeenCalledTimes(1); + }); - // Should retry transient errors - expect(getFile).toHaveBeenCalledTimes(2); + it("still retries transient errors even after encountering file too big in different call", async () => { + const result = await expectTransientGetFileRetrySuccess(); + // Should retry transient errors. expect(result).not.toBeNull(); }); }); diff --git a/src/telegram/bot/delivery.test.ts b/src/telegram/bot/delivery.test.ts index f211b804d..d4606ac14 100644 --- a/src/telegram/bot/delivery.test.ts +++ b/src/telegram/bot/delivery.test.ts @@ -71,6 +71,25 @@ function createSendMessageHarness(messageId = 4) { return { runtime, sendMessage, bot }; } +function createVoiceMessagesForbiddenError() { + return new Error( + "GrammyError: Call to 'sendVoice' failed! (400: Bad Request: VOICE_MESSAGES_FORBIDDEN)", + ); +} + +function createVoiceFailureHarness(params: { + voiceError: Error; + sendMessageResult?: { message_id: number; chat: { id: string } }; +}) { + const runtime = createRuntime(); + const sendVoice = vi.fn().mockRejectedValue(params.voiceError); + const sendMessage = params.sendMessageResult + ? vi.fn().mockResolvedValue(params.sendMessageResult) + : vi.fn(); + const bot = createBot({ sendVoice, sendMessage }); + return { runtime, sendVoice, sendMessage, bot }; +} + describe("deliverReplies", () => { beforeEach(() => { loadWebMedia.mockClear(); @@ -258,19 +277,13 @@ describe("deliverReplies", () => { }); it("falls back to text when sendVoice fails with VOICE_MESSAGES_FORBIDDEN", async () => { - const runtime = createRuntime(); - const sendVoice = vi - .fn() - .mockRejectedValue( - new Error( - "GrammyError: Call to 'sendVoice' failed! (400: Bad Request: VOICE_MESSAGES_FORBIDDEN)", - ), - ); - const sendMessage = vi.fn().mockResolvedValue({ - message_id: 5, - chat: { id: "123" }, + const { runtime, sendVoice, sendMessage, bot } = createVoiceFailureHarness({ + voiceError: createVoiceMessagesForbiddenError(), + sendMessageResult: { + message_id: 5, + chat: { id: "123" }, + }, }); - const bot = createBot({ sendVoice, sendMessage }); mockMediaLoad("note.ogg", "audio/ogg", "voice"); @@ -315,16 +328,9 @@ describe("deliverReplies", () => { }); it("rethrows VOICE_MESSAGES_FORBIDDEN when no text fallback is available", async () => { - const runtime = createRuntime(); - const sendVoice = vi - .fn() - .mockRejectedValue( - new Error( - "GrammyError: Call to 'sendVoice' failed! (400: Bad Request: VOICE_MESSAGES_FORBIDDEN)", - ), - ); - const sendMessage = vi.fn(); - const bot = createBot({ sendVoice, sendMessage }); + const { runtime, sendVoice, sendMessage, bot } = createVoiceFailureHarness({ + voiceError: createVoiceMessagesForbiddenError(), + }); mockMediaLoad("note.ogg", "audio/ogg", "voice"); diff --git a/src/web/auto-reply/deliver-reply.test.ts b/src/web/auto-reply/deliver-reply.test.ts index 385fcd65a..24f6e2eb8 100644 --- a/src/web/auto-reply/deliver-reply.test.ts +++ b/src/web/auto-reply/deliver-reply.test.ts @@ -52,6 +52,18 @@ function mockFirstSendMediaFailure(msg: WebInboundMsg, message: string) { ).mockRejectedValueOnce(new Error(message)); } +function mockFirstReplyFailure(msg: WebInboundMsg, message: string) { + (msg.reply as unknown as { mockRejectedValueOnce: (v: unknown) => void }).mockRejectedValueOnce( + new Error(message), + ); +} + +function mockSecondReplySuccess(msg: WebInboundMsg) { + (msg.reply as unknown as { mockResolvedValueOnce: (v: unknown) => void }).mockResolvedValueOnce( + undefined, + ); +} + const replyLogger = { info: vi.fn(), warn: vi.fn(), @@ -76,60 +88,31 @@ describe("deliverWebReply", () => { expect(replyLogger.info).toHaveBeenCalledWith(expect.any(Object), "auto-reply sent (text)"); }); - it("retries text send on transient failure", async () => { - const msg = makeMsg(); - (msg.reply as unknown as { mockRejectedValueOnce: (v: unknown) => void }).mockRejectedValueOnce( - new Error("connection closed"), - ); - (msg.reply as unknown as { mockResolvedValueOnce: (v: unknown) => void }).mockResolvedValueOnce( - undefined, - ); + it.each(["connection closed", "operation timed out"])( + "retries text send on transient failure: %s", + async (errorMessage) => { + const msg = makeMsg(); + mockFirstReplyFailure(msg, errorMessage); + mockSecondReplySuccess(msg); - await deliverWebReply({ - replyResult: { text: "hi" }, - msg, - maxMediaBytes: 1024 * 1024, - textLimit: 200, - replyLogger, - skipLog: true, - }); + await deliverWebReply({ + replyResult: { text: "hi" }, + msg, + maxMediaBytes: 1024 * 1024, + textLimit: 200, + replyLogger, + skipLog: true, + }); - expect(msg.reply).toHaveBeenCalledTimes(2); - expect(sleep).toHaveBeenCalledWith(500); - }); - - it("retries text send when error contains timed out", async () => { - const msg = makeMsg(); - (msg.reply as unknown as { mockRejectedValueOnce: (v: unknown) => void }).mockRejectedValueOnce( - new Error("operation timed out"), - ); - (msg.reply as unknown as { mockResolvedValueOnce: (v: unknown) => void }).mockResolvedValueOnce( - undefined, - ); - - await deliverWebReply({ - replyResult: { text: "hi" }, - msg, - maxMediaBytes: 1024 * 1024, - textLimit: 200, - replyLogger, - skipLog: true, - }); - - expect(msg.reply).toHaveBeenCalledTimes(2); - expect(sleep).toHaveBeenCalledWith(500); - }); + expect(msg.reply).toHaveBeenCalledTimes(2); + expect(sleep).toHaveBeenCalledWith(500); + }, + ); it("sends image media with caption and then remaining text", async () => { const msg = makeMsg(); const mediaLocalRoots = ["/tmp/workspace-work"]; - ( - loadWebMedia as unknown as { mockResolvedValueOnce: (v: unknown) => void } - ).mockResolvedValueOnce({ - buffer: Buffer.from("img"), - contentType: "image/jpeg", - kind: "image", - }); + mockLoadedImageMedia(); await deliverWebReply({ replyResult: { text: "aaaaaa", mediaUrl: "http://example.com/img.jpg" }, diff --git a/src/web/auto-reply/heartbeat-runner.test.ts b/src/web/auto-reply/heartbeat-runner.test.ts index e6e5d234f..78014787a 100644 --- a/src/web/auto-reply/heartbeat-runner.test.ts +++ b/src/web/auto-reply/heartbeat-runner.test.ts @@ -95,6 +95,13 @@ describe("runWebHeartbeatOnce", () => { let replyResolver: typeof getReplyFromConfig; const getModules = async () => await import("./heartbeat-runner.js"); + const buildRunArgs = (overrides: Record = {}) => ({ + cfg: { agents: { defaults: {} }, session: {} } as never, + to: "+123", + sender, + replyResolver, + ...overrides, + }); beforeEach(() => { state.visibility = { showAlerts: true, showOk: true, useIndicator: false }; @@ -117,26 +124,14 @@ describe("runWebHeartbeatOnce", () => { it("supports manual override body dry-run without sending", async () => { const { runWebHeartbeatOnce } = await getModules(); - await runWebHeartbeatOnce({ - cfg: { agents: { defaults: {} }, session: {} } as never, - to: "+123", - sender, - replyResolver, - overrideBody: "hello", - dryRun: true, - }); + await runWebHeartbeatOnce(buildRunArgs({ overrideBody: "hello", dryRun: true })); expect(senderMock).not.toHaveBeenCalled(); expect(state.events).toHaveLength(0); }); it("sends HEARTBEAT_OK when reply is empty and showOk is enabled", async () => { const { runWebHeartbeatOnce } = await getModules(); - await runWebHeartbeatOnce({ - cfg: { agents: { defaults: {} }, session: {} } as never, - to: "+123", - sender, - replyResolver, - }); + await runWebHeartbeatOnce(buildRunArgs()); expect(senderMock).toHaveBeenCalledWith("+123", HEARTBEAT_TOKEN, { verbose: false }); expect(state.events).toEqual( expect.arrayContaining([expect.objectContaining({ status: "ok-empty", silent: false })]), @@ -145,13 +140,12 @@ describe("runWebHeartbeatOnce", () => { it("injects a cron-style Current time line into the heartbeat prompt", async () => { const { runWebHeartbeatOnce } = await getModules(); - await runWebHeartbeatOnce({ - cfg: { agents: { defaults: { heartbeat: { prompt: "Ops check" } } }, session: {} } as never, - to: "+123", - sender, - replyResolver, - dryRun: true, - }); + await runWebHeartbeatOnce( + buildRunArgs({ + cfg: { agents: { defaults: { heartbeat: { prompt: "Ops check" } } }, session: {} } as never, + dryRun: true, + }), + ); expect(replyResolver).toHaveBeenCalledTimes(1); const ctx = replyResolverMock.mock.calls[0]?.[0]; expect(ctx?.Body).toContain("Ops check"); @@ -161,12 +155,7 @@ describe("runWebHeartbeatOnce", () => { it("treats heartbeat token-only replies as ok-token and preserves session updatedAt", async () => { replyResolverMock.mockResolvedValue({ text: HEARTBEAT_TOKEN }); const { runWebHeartbeatOnce } = await getModules(); - await runWebHeartbeatOnce({ - cfg: { agents: { defaults: {} }, session: {} } as never, - to: "+123", - sender, - replyResolver, - }); + await runWebHeartbeatOnce(buildRunArgs()); expect(state.store.k?.updatedAt).toBe(123); expect(senderMock).toHaveBeenCalledWith("+123", HEARTBEAT_TOKEN, { verbose: false }); expect(state.events).toEqual( @@ -178,12 +167,7 @@ describe("runWebHeartbeatOnce", () => { state.visibility = { showAlerts: false, showOk: true, useIndicator: true }; replyResolverMock.mockResolvedValue({ text: "ALERT" }); const { runWebHeartbeatOnce } = await getModules(); - await runWebHeartbeatOnce({ - cfg: { agents: { defaults: {} }, session: {} } as never, - to: "+123", - sender, - replyResolver, - }); + await runWebHeartbeatOnce(buildRunArgs()); expect(senderMock).not.toHaveBeenCalled(); expect(state.events).toEqual( expect.arrayContaining([ @@ -196,14 +180,7 @@ describe("runWebHeartbeatOnce", () => { replyResolverMock.mockResolvedValue({ text: "ALERT" }); senderMock.mockRejectedValueOnce(new Error("nope")); const { runWebHeartbeatOnce } = await getModules(); - await expect( - runWebHeartbeatOnce({ - cfg: { agents: { defaults: {} }, session: {} } as never, - to: "+123", - sender, - replyResolver, - }), - ).rejects.toThrow("nope"); + await expect(runWebHeartbeatOnce(buildRunArgs())).rejects.toThrow("nope"); expect(state.events).toEqual( expect.arrayContaining([ expect.objectContaining({ status: "failed", reason: "ERR:Error: nope" }), diff --git a/src/web/auto-reply/monitor/group-gating.ts b/src/web/auto-reply/monitor/group-gating.ts index a2ae98eea..d1867ed24 100644 --- a/src/web/auto-reply/monitor/group-gating.ts +++ b/src/web/auto-reply/monitor/group-gating.ts @@ -19,6 +19,22 @@ export type GroupHistoryEntry = { senderJid?: string; }; +type ApplyGroupGatingParams = { + cfg: ReturnType; + msg: WebInboundMsg; + conversationId: string; + groupHistoryKey: string; + agentId: string; + sessionKey: string; + baseMentionConfig: MentionConfig; + authDir?: string; + groupHistories: Map; + groupHistoryLimit: number; + groupMemberNames: Map>; + logVerbose: (msg: string) => void; + replyLogger: { debug: (obj: unknown, msg: string) => void }; +}; + function isOwnerSender(baseMentionConfig: MentionConfig, msg: WebInboundMsg) { const sender = normalizeE164(msg.senderE164 ?? ""); if (!sender) { @@ -52,21 +68,18 @@ function recordPendingGroupHistoryEntry(params: { }); } -export function applyGroupGating(params: { - cfg: ReturnType; - msg: WebInboundMsg; - conversationId: string; - groupHistoryKey: string; - agentId: string; - sessionKey: string; - baseMentionConfig: MentionConfig; - authDir?: string; - groupHistories: Map; - groupHistoryLimit: number; - groupMemberNames: Map>; - logVerbose: (msg: string) => void; - replyLogger: { debug: (obj: unknown, msg: string) => void }; -}) { +function skipGroupMessageAndStoreHistory(params: ApplyGroupGatingParams, verboseMessage: string) { + params.logVerbose(verboseMessage); + recordPendingGroupHistoryEntry({ + msg: params.msg, + groupHistories: params.groupHistories, + groupHistoryKey: params.groupHistoryKey, + groupHistoryLimit: params.groupHistoryLimit, + }); + return { shouldProcess: false } as const; +} + +export function applyGroupGating(params: ApplyGroupGatingParams) { const groupPolicy = resolveGroupPolicyFor(params.cfg, params.conversationId); if (groupPolicy.allowlistEnabled && !groupPolicy.allowed) { params.logVerbose(`Skipping group message ${params.conversationId} (not in allowlist)`); @@ -91,14 +104,10 @@ export function applyGroupGating(params: { const shouldBypassMention = owner && hasControlCommand(commandBody, params.cfg); if (activationCommand.hasCommand && !owner) { - params.logVerbose(`Ignoring /activation from non-owner in group ${params.conversationId}`); - recordPendingGroupHistoryEntry({ - msg: params.msg, - groupHistories: params.groupHistories, - groupHistoryKey: params.groupHistoryKey, - groupHistoryLimit: params.groupHistoryLimit, - }); - return { shouldProcess: false }; + return skipGroupMessageAndStoreHistory( + params, + `Ignoring /activation from non-owner in group ${params.conversationId}`, + ); } const mentionDebug = debugMention(params.msg, mentionConfig, params.authDir); @@ -137,16 +146,10 @@ export function applyGroupGating(params: { }); params.msg.wasMentioned = mentionGate.effectiveWasMentioned; if (!shouldBypassMention && requireMention && mentionGate.shouldSkip) { - params.logVerbose( + return skipGroupMessageAndStoreHistory( + params, `Group message stored for context (no mention detected) in ${params.conversationId}: ${params.msg.body}`, ); - recordPendingGroupHistoryEntry({ - msg: params.msg, - groupHistories: params.groupHistories, - groupHistoryKey: params.groupHistoryKey, - groupHistoryLimit: params.groupHistoryLimit, - }); - return { shouldProcess: false }; } return { shouldProcess: true }; diff --git a/src/web/auto-reply/monitor/group-members.test.ts b/src/web/auto-reply/monitor/group-members.test.ts new file mode 100644 index 000000000..2ed33780b --- /dev/null +++ b/src/web/auto-reply/monitor/group-members.test.ts @@ -0,0 +1,56 @@ +import { describe, expect, it } from "vitest"; +import { formatGroupMembers, noteGroupMember } from "./group-members.js"; + +describe("noteGroupMember", () => { + it("normalizes member phone numbers before storing", () => { + const groupMemberNames = new Map>(); + + noteGroupMember(groupMemberNames, "g1", "+1 (555) 123-4567", "Alice"); + + expect(groupMemberNames.get("g1")?.get("+15551234567")).toBe("Alice"); + }); + + it("ignores incomplete member values", () => { + const groupMemberNames = new Map>(); + + noteGroupMember(groupMemberNames, "g1", undefined, "Alice"); + noteGroupMember(groupMemberNames, "g1", "+15551234567", undefined); + + expect(groupMemberNames.get("g1")).toBeUndefined(); + }); +}); + +describe("formatGroupMembers", () => { + it("deduplicates participants and appends named roster members", () => { + const roster = new Map([ + ["+16660000000", "Bob"], + ["+17770000000", "Carol"], + ]); + + const formatted = formatGroupMembers({ + participants: ["+1 (555) 000-0000", "+15550000000", "+16660000000"], + roster, + }); + + expect(formatted).toBe("+15550000000, Bob (+16660000000), Carol (+17770000000)"); + }); + + it("falls back to sender when no participants or roster are available", () => { + const formatted = formatGroupMembers({ + participants: [], + roster: undefined, + fallbackE164: "+1 (555) 222-3333", + }); + + expect(formatted).toBe("+15552223333"); + }); + + it("returns undefined when no members can be resolved", () => { + expect( + formatGroupMembers({ + participants: [], + roster: undefined, + }), + ).toBeUndefined(); + }); +}); diff --git a/src/web/auto-reply/monitor/group-members.ts b/src/web/auto-reply/monitor/group-members.ts index dc69935e9..5564c4b87 100644 --- a/src/web/auto-reply/monitor/group-members.ts +++ b/src/web/auto-reply/monitor/group-members.ts @@ -1,5 +1,16 @@ import { normalizeE164 } from "../../../utils.js"; +function appendNormalizedUnique(entries: Iterable, seen: Set, ordered: string[]) { + for (const entry of entries) { + const normalized = normalizeE164(entry) ?? entry; + if (!normalized || seen.has(normalized)) { + continue; + } + seen.add(normalized); + ordered.push(normalized); + } +} + export function noteGroupMember( groupMemberNames: Map>, conversationId: string, @@ -31,27 +42,10 @@ export function formatGroupMembers(params: { const seen = new Set(); const ordered: string[] = []; if (participants?.length) { - for (const entry of participants) { - if (!entry) { - continue; - } - const normalized = normalizeE164(entry) ?? entry; - if (!normalized || seen.has(normalized)) { - continue; - } - seen.add(normalized); - ordered.push(normalized); - } + appendNormalizedUnique(participants, seen, ordered); } if (roster) { - for (const entry of roster.keys()) { - const normalized = normalizeE164(entry) ?? entry; - if (!normalized || seen.has(normalized)) { - continue; - } - seen.add(normalized); - ordered.push(normalized); - } + appendNormalizedUnique(roster.keys(), seen, ordered); } if (ordered.length === 0 && fallbackE164) { const normalized = normalizeE164(fallbackE164) ?? fallbackE164; diff --git a/src/web/auto-reply/web-auto-reply-monitor.test.ts b/src/web/auto-reply/web-auto-reply-monitor.test.ts index bd8e9e3fb..925d430de 100644 --- a/src/web/auto-reply/web-auto-reply-monitor.test.ts +++ b/src/web/auto-reply/web-auto-reply-monitor.test.ts @@ -83,6 +83,17 @@ function createGroupMessage(overrides: Record = {}) { }; } +function makeOwnerGroupConfig() { + return makeConfig({ + channels: { + whatsapp: { + allowFrom: ["+111"], + groups: { "*": { requireMention: true } }, + }, + }, + }); +} + function makeInboundCfg(messagePrefix = "") { return { agents: { defaults: { workspace: "/tmp/openclaw" } }, @@ -114,21 +125,15 @@ describe("applyGroupGating", () => { expect(result.shouldProcess).toBe(true); }); - it("bypasses mention gating for owner /new in group chats", () => { - const cfg = makeConfig({ - channels: { - whatsapp: { - allowFrom: ["+111"], - groups: { "*": { requireMention: true } }, - }, - }, - }); - + it.each([ + { id: "g-new", command: "/new" }, + { id: "g-status", command: "/status" }, + ])("bypasses mention gating for owner $command in group chats", ({ id, command }) => { const { result } = runGroupGating({ - cfg, + cfg: makeOwnerGroupConfig(), msg: createGroupMessage({ - id: "g-new", - body: "/new", + id, + body: command, senderE164: "+111", senderName: "Owner", }), @@ -161,29 +166,6 @@ describe("applyGroupGating", () => { expect(groupHistories.get("whatsapp:default:group:123@g.us")?.length).toBe(1); }); - it("bypasses mention gating for owner /status in group chats", () => { - const cfg = makeConfig({ - channels: { - whatsapp: { - allowFrom: ["+111"], - groups: { "*": { requireMention: true } }, - }, - }, - }); - - const { result } = runGroupGating({ - cfg, - msg: createGroupMessage({ - id: "g-status", - body: "/status", - senderE164: "+111", - senderName: "Owner", - }), - }); - - expect(result.shouldProcess).toBe(true); - }); - it("uses per-agent mention patterns for group gating (routing + mentionPatterns)", () => { const cfg = makeConfig({ channels: { diff --git a/src/web/inbound/media.node.test.ts b/src/web/inbound/media.node.test.ts index 5e9b7b991..71d75309f 100644 --- a/src/web/inbound/media.node.test.ts +++ b/src/web/inbound/media.node.test.ts @@ -17,6 +17,12 @@ const mockSock = { logger: { child: () => ({}) }, } as never; +async function expectMimetype(message: Record, expected: string) { + const result = await downloadInboundMedia({ message } as never, mockSock); + expect(result).toBeDefined(); + expect(result?.mimetype).toBe(expected); +} + describe("downloadInboundMedia", () => { it("returns undefined for messages without media", async () => { const msg = { message: { conversation: "hello" } } as never; @@ -25,66 +31,26 @@ describe("downloadInboundMedia", () => { }); it("uses explicit mimetype from audioMessage when present", async () => { - const msg = { - message: { audioMessage: { mimetype: "audio/mp4", ptt: true } }, - } as never; - const result = await downloadInboundMedia(msg, mockSock); - expect(result).toBeDefined(); - expect(result?.mimetype).toBe("audio/mp4"); + await expectMimetype({ audioMessage: { mimetype: "audio/mp4", ptt: true } }, "audio/mp4"); }); - it("defaults to audio/ogg for voice messages without explicit MIME", async () => { - const msg = { - message: { audioMessage: { ptt: true } }, - } as never; - const result = await downloadInboundMedia(msg, mockSock); - expect(result).toBeDefined(); - expect(result?.mimetype).toBe("audio/ogg; codecs=opus"); - }); - - it("defaults to audio/ogg for audio messages without MIME or ptt flag", async () => { - const msg = { - message: { audioMessage: {} }, - } as never; - const result = await downloadInboundMedia(msg, mockSock); - expect(result).toBeDefined(); - expect(result?.mimetype).toBe("audio/ogg; codecs=opus"); + it.each([ + { name: "voice messages without explicit MIME", audioMessage: { ptt: true } }, + { name: "audio messages without MIME or ptt flag", audioMessage: {} }, + ])("defaults to audio/ogg for $name", async ({ audioMessage }) => { + await expectMimetype({ audioMessage }, "audio/ogg; codecs=opus"); }); it("uses explicit mimetype from imageMessage when present", async () => { - const msg = { - message: { imageMessage: { mimetype: "image/png" } }, - } as never; - const result = await downloadInboundMedia(msg, mockSock); - expect(result).toBeDefined(); - expect(result?.mimetype).toBe("image/png"); + await expectMimetype({ imageMessage: { mimetype: "image/png" } }, "image/png"); }); - it("defaults to image/jpeg for images without explicit MIME", async () => { - const msg = { - message: { imageMessage: {} }, - } as never; - const result = await downloadInboundMedia(msg, mockSock); - expect(result).toBeDefined(); - expect(result?.mimetype).toBe("image/jpeg"); - }); - - it("defaults to video/mp4 for video messages without explicit MIME", async () => { - const msg = { - message: { videoMessage: {} }, - } as never; - const result = await downloadInboundMedia(msg, mockSock); - expect(result).toBeDefined(); - expect(result?.mimetype).toBe("video/mp4"); - }); - - it("defaults to image/webp for sticker messages without explicit MIME", async () => { - const msg = { - message: { stickerMessage: {} }, - } as never; - const result = await downloadInboundMedia(msg, mockSock); - expect(result).toBeDefined(); - expect(result?.mimetype).toBe("image/webp"); + it.each([ + { name: "image", message: { imageMessage: {} }, mimetype: "image/jpeg" }, + { name: "video", message: { videoMessage: {} }, mimetype: "video/mp4" }, + { name: "sticker", message: { stickerMessage: {} }, mimetype: "image/webp" }, + ])("defaults MIME for $name messages without explicit MIME", async ({ message, mimetype }) => { + await expectMimetype(message, mimetype); }); it("preserves fileName from document messages", async () => {