refactor(runtime): consolidate followup, gateway, and provider dedupe paths

This commit is contained in:
Peter Steinberger
2026-02-22 14:06:03 +00:00
parent 38752338dc
commit d116bcfb14
36 changed files with 848 additions and 908 deletions

View File

@@ -60,8 +60,8 @@ describe("injectHistoryImagesIntoMessages", () => {
});
describe("resolvePromptBuildHookResult", () => {
it("reuses precomputed legacy before_agent_start result without invoking hook again", async () => {
const hookRunner = {
function createLegacyOnlyHookRunner() {
return {
hasHooks: vi.fn(
(hookName: "before_prompt_build" | "before_agent_start") =>
hookName === "before_agent_start",
@@ -69,6 +69,10 @@ describe("resolvePromptBuildHookResult", () => {
runBeforePromptBuild: vi.fn(async () => undefined),
runBeforeAgentStart: vi.fn(async () => ({ prependContext: "from-hook" })),
};
}
it("reuses precomputed legacy before_agent_start result without invoking hook again", async () => {
const hookRunner = createLegacyOnlyHookRunner();
const result = await resolvePromptBuildHookResult({
prompt: "hello",
messages: [],
@@ -85,14 +89,7 @@ describe("resolvePromptBuildHookResult", () => {
});
it("calls legacy hook when precomputed result is absent", async () => {
const hookRunner = {
hasHooks: vi.fn(
(hookName: "before_prompt_build" | "before_agent_start") =>
hookName === "before_agent_start",
),
runBeforePromptBuild: vi.fn(async () => undefined),
runBeforeAgentStart: vi.fn(async () => ({ prependContext: "from-hook" })),
};
const hookRunner = createLegacyOnlyHookRunner();
const messages = [{ role: "user", content: "ctx" }];
const result = await resolvePromptBuildHookResult({
prompt: "hello",

View File

@@ -2,9 +2,15 @@ import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it } from "vitest";
import { formatBillingErrorMessage } from "../../pi-embedded-helpers.js";
import { makeAssistantMessageFixture } from "../../test-helpers/assistant-message-fixtures.js";
import { buildEmbeddedRunPayloads } from "./payloads.js";
import {
buildPayloads,
expectSinglePayloadText,
expectSingleToolErrorPayload,
} from "./payloads.test-helpers.js";
describe("buildEmbeddedRunPayloads", () => {
const OVERLOADED_FALLBACK_TEXT =
"The AI service is temporarily overloaded. Please try again in a moment.";
const errorJson =
'{"type":"error","error":{"details":null,"type":"overloaded_error","message":"Overloaded"},"request_id":"req_011CX7DwS7tSvggaNHmefwWg"}';
const errorJsonPretty = `{
@@ -22,31 +28,25 @@ describe("buildEmbeddedRunPayloads", () => {
content: [{ type: "text", text: errorJson }],
...overrides,
});
type BuildPayloadParams = Parameters<typeof buildEmbeddedRunPayloads>[0];
const buildPayloads = (overrides: Partial<BuildPayloadParams> = {}) =>
buildEmbeddedRunPayloads({
assistantTexts: [],
toolMetas: [],
lastAssistant: undefined,
sessionKey: "session:telegram",
inlineToolResultsAllowed: false,
verboseLevel: "off",
reasoningLevel: "off",
toolResultFormat: "plain",
...overrides,
const makeStoppedAssistant = () =>
makeAssistant({
stopReason: "stop",
errorMessage: undefined,
content: [],
});
const expectOverloadedFallback = (payloads: ReturnType<typeof buildPayloads>) => {
expect(payloads).toHaveLength(1);
expect(payloads[0]?.text).toBe(OVERLOADED_FALLBACK_TEXT);
};
it("suppresses raw API error JSON when the assistant errored", () => {
const payloads = buildPayloads({
assistantTexts: [errorJson],
lastAssistant: makeAssistant({}),
});
expect(payloads).toHaveLength(1);
expect(payloads[0]?.text).toBe(
"The AI service is temporarily overloaded. Please try again in a moment.",
);
expectOverloadedFallback(payloads);
expect(payloads[0]?.isError).toBe(true);
expect(payloads.some((payload) => payload.text === errorJson)).toBe(false);
});
@@ -59,10 +59,7 @@ describe("buildEmbeddedRunPayloads", () => {
verboseLevel: "on",
});
expect(payloads).toHaveLength(1);
expect(payloads[0]?.text).toBe(
"The AI service is temporarily overloaded. Please try again in a moment.",
);
expectOverloadedFallback(payloads);
expect(payloads.some((payload) => payload.text === errorJsonPretty)).toBe(false);
});
@@ -71,10 +68,7 @@ describe("buildEmbeddedRunPayloads", () => {
lastAssistant: makeAssistant({ content: [{ type: "text", text: errorJsonPretty }] }),
});
expect(payloads).toHaveLength(1);
expect(payloads[0]?.text).toBe(
"The AI service is temporarily overloaded. Please try again in a moment.",
);
expectOverloadedFallback(payloads);
expect(payloads.some((payload) => payload.text?.includes("request_id"))).toBe(false);
});
@@ -108,15 +102,10 @@ describe("buildEmbeddedRunPayloads", () => {
it("does not suppress error-shaped JSON when the assistant did not error", () => {
const payloads = buildPayloads({
assistantTexts: [errorJsonPretty],
lastAssistant: makeAssistant({
stopReason: "stop",
errorMessage: undefined,
content: [],
}),
lastAssistant: makeStoppedAssistant(),
});
expect(payloads).toHaveLength(1);
expect(payloads[0]?.text).toBe(errorJsonPretty.trim());
expectSinglePayloadText(payloads, errorJsonPretty.trim());
});
it("adds a fallback error when a tool fails and no assistant output exists", () => {
@@ -133,31 +122,21 @@ describe("buildEmbeddedRunPayloads", () => {
it("does not add tool error fallback when assistant output exists", () => {
const payloads = buildPayloads({
assistantTexts: ["All good"],
lastAssistant: makeAssistant({
stopReason: "stop",
errorMessage: undefined,
content: [],
}),
lastAssistant: makeStoppedAssistant(),
lastToolError: { toolName: "browser", error: "tab not found" },
});
expect(payloads).toHaveLength(1);
expect(payloads[0]?.text).toBe("All good");
expectSinglePayloadText(payloads, "All good");
});
it("adds completion fallback when tools run successfully without final assistant text", () => {
const payloads = buildPayloads({
toolMetas: [{ toolName: "write", meta: "/tmp/out.md" }],
lastAssistant: makeAssistant({
stopReason: "stop",
errorMessage: undefined,
content: [],
}),
lastAssistant: makeStoppedAssistant(),
});
expect(payloads).toHaveLength(1);
expectSinglePayloadText(payloads, "✅ Done.");
expect(payloads[0]?.isError).toBeUndefined();
expect(payloads[0]?.text).toBe("✅ Done.");
});
it("does not add completion fallback when the run still has a tool error", () => {
@@ -171,11 +150,7 @@ describe("buildEmbeddedRunPayloads", () => {
it("does not add completion fallback when no tools ran", () => {
const payloads = buildPayloads({
lastAssistant: makeAssistant({
stopReason: "stop",
errorMessage: undefined,
content: [],
}),
lastAssistant: makeStoppedAssistant(),
});
expect(payloads).toHaveLength(0);
@@ -199,10 +174,10 @@ describe("buildEmbeddedRunPayloads", () => {
verboseLevel: "on",
});
expect(payloads).toHaveLength(1);
expect(payloads[0]?.isError).toBe(true);
expect(payloads[0]?.text).toContain("Exec");
expect(payloads[0]?.text).toContain("code 1");
expectSingleToolErrorPayload(payloads, {
title: "Exec",
detail: "code 1",
});
});
it("does not add tool error fallback when assistant text exists after tool calls", () => {

View File

@@ -0,0 +1,41 @@
import { expect } from "vitest";
import { buildEmbeddedRunPayloads } from "./payloads.js";
export type BuildPayloadParams = Parameters<typeof buildEmbeddedRunPayloads>[0];
type RunPayloads = ReturnType<typeof buildEmbeddedRunPayloads>;
export function buildPayloads(overrides: Partial<BuildPayloadParams> = {}) {
return buildEmbeddedRunPayloads({
assistantTexts: [],
toolMetas: [],
lastAssistant: undefined,
sessionKey: "session:telegram",
inlineToolResultsAllowed: false,
verboseLevel: "off",
reasoningLevel: "off",
toolResultFormat: "plain",
...overrides,
});
}
export function expectSinglePayloadText(
payloads: RunPayloads,
text: string,
expectedError?: boolean,
): void {
expect(payloads).toHaveLength(1);
expect(payloads[0]?.text).toBe(text);
if (typeof expectedError === "boolean") {
expect(payloads[0]?.isError).toBe(expectedError);
}
}
export function expectSingleToolErrorPayload(
payloads: RunPayloads,
params: { title: string; detail: string },
): void {
expect(payloads).toHaveLength(1);
expect(payloads[0]?.isError).toBe(true);
expect(payloads[0]?.text).toContain(params.title);
expect(payloads[0]?.text).toContain(params.detail);
}

View File

@@ -1,21 +1,5 @@
import { describe, expect, it } from "vitest";
import { buildEmbeddedRunPayloads } from "./payloads.js";
type BuildPayloadParams = Parameters<typeof buildEmbeddedRunPayloads>[0];
function buildPayloads(overrides: Partial<BuildPayloadParams> = {}) {
return buildEmbeddedRunPayloads({
assistantTexts: [],
toolMetas: [],
lastAssistant: undefined,
sessionKey: "session:telegram",
inlineToolResultsAllowed: false,
verboseLevel: "off",
reasoningLevel: "off",
toolResultFormat: "plain",
...overrides,
});
}
import { buildPayloads, expectSingleToolErrorPayload } from "./payloads.test-helpers.js";
describe("buildEmbeddedRunPayloads tool-error warnings", () => {
it("suppresses exec tool errors when verbose mode is off", () => {
@@ -33,10 +17,10 @@ describe("buildEmbeddedRunPayloads tool-error warnings", () => {
verboseLevel: "on",
});
expect(payloads).toHaveLength(1);
expect(payloads[0]?.isError).toBe(true);
expect(payloads[0]?.text).toContain("Exec");
expect(payloads[0]?.text).toContain("command failed");
expectSingleToolErrorPayload(payloads, {
title: "Exec",
detail: "command failed",
});
});
it("keeps non-exec mutating tool failures visible", () => {

View File

@@ -96,22 +96,26 @@ function hasImageBlocks(content: ReadonlyArray<TextContent | ImageContent>): boo
return false;
}
function estimateTextAndImageChars(content: ReadonlyArray<TextContent | ImageContent>): number {
let chars = 0;
for (const block of content) {
if (block.type === "text") {
chars += block.text.length;
}
if (block.type === "image") {
chars += IMAGE_CHAR_ESTIMATE;
}
}
return chars;
}
function estimateMessageChars(message: AgentMessage): number {
if (message.role === "user") {
const content = message.content;
if (typeof content === "string") {
return content.length;
}
let chars = 0;
for (const b of content) {
if (b.type === "text") {
chars += b.text.length;
}
if (b.type === "image") {
chars += IMAGE_CHAR_ESTIMATE;
}
}
return chars;
return estimateTextAndImageChars(content);
}
if (message.role === "assistant") {
@@ -135,16 +139,7 @@ function estimateMessageChars(message: AgentMessage): number {
}
if (message.role === "toolResult") {
let chars = 0;
for (const b of message.content) {
if (b.type === "text") {
chars += b.text.length;
}
if (b.type === "image") {
chars += IMAGE_CHAR_ESTIMATE;
}
}
return chars;
return estimateTextAndImageChars(message.content);
}
return 256;

View File

@@ -27,118 +27,79 @@ afterEach(() => {
});
describe("resolveCommandAuthorization", () => {
it("falls back from empty SenderId to SenderE164", () => {
function resolveWhatsAppAuthorization(params: {
from: string;
senderId?: string;
senderE164?: string;
allowFrom: string[];
}) {
const cfg = {
channels: { whatsapp: { allowFrom: ["+123"] } },
channels: { whatsapp: { allowFrom: params.allowFrom } },
} as OpenClawConfig;
const ctx = {
Provider: "whatsapp",
Surface: "whatsapp",
From: "whatsapp:+999",
SenderId: "",
SenderE164: "+123",
From: params.from,
SenderId: params.senderId,
SenderE164: params.senderE164,
} as MsgContext;
const auth = resolveCommandAuthorization({
return resolveCommandAuthorization({
ctx,
cfg,
commandAuthorized: true,
});
}
expect(auth.senderId).toBe("+123");
expect(auth.isAuthorizedSender).toBe(true);
});
it("falls back from whitespace SenderId to SenderE164", () => {
const cfg = {
channels: { whatsapp: { allowFrom: ["+123"] } },
} as OpenClawConfig;
const ctx = {
Provider: "whatsapp",
Surface: "whatsapp",
From: "whatsapp:+999",
SenderId: " ",
SenderE164: "+123",
} as MsgContext;
const auth = resolveCommandAuthorization({
ctx,
cfg,
commandAuthorized: true,
it.each([
{
name: "falls back from empty SenderId to SenderE164",
from: "whatsapp:+999",
senderId: "",
senderE164: "+123",
allowFrom: ["+123"],
expectedSenderId: "+123",
},
{
name: "falls back from whitespace SenderId to SenderE164",
from: "whatsapp:+999",
senderId: " ",
senderE164: "+123",
allowFrom: ["+123"],
expectedSenderId: "+123",
},
{
name: "falls back to From when SenderId and SenderE164 are whitespace",
from: "whatsapp:+999",
senderId: " ",
senderE164: " ",
allowFrom: ["+999"],
expectedSenderId: "+999",
},
{
name: "falls back from un-normalizable SenderId to SenderE164",
from: "whatsapp:+999",
senderId: "wat",
senderE164: "+123",
allowFrom: ["+123"],
expectedSenderId: "+123",
},
{
name: "prefers SenderE164 when SenderId does not match allowFrom",
from: "whatsapp:120363401234567890@g.us",
senderId: "123@lid",
senderE164: "+41796666864",
allowFrom: ["+41796666864"],
expectedSenderId: "+41796666864",
},
])("$name", ({ from, senderId, senderE164, allowFrom, expectedSenderId }) => {
const auth = resolveWhatsAppAuthorization({
from,
senderId,
senderE164,
allowFrom,
});
expect(auth.senderId).toBe("+123");
expect(auth.isAuthorizedSender).toBe(true);
});
it("falls back to From when SenderId and SenderE164 are whitespace", () => {
const cfg = {
channels: { whatsapp: { allowFrom: ["+999"] } },
} as OpenClawConfig;
const ctx = {
Provider: "whatsapp",
Surface: "whatsapp",
From: "whatsapp:+999",
SenderId: " ",
SenderE164: " ",
} as MsgContext;
const auth = resolveCommandAuthorization({
ctx,
cfg,
commandAuthorized: true,
});
expect(auth.senderId).toBe("+999");
expect(auth.isAuthorizedSender).toBe(true);
});
it("falls back from un-normalizable SenderId to SenderE164", () => {
const cfg = {
channels: { whatsapp: { allowFrom: ["+123"] } },
} as OpenClawConfig;
const ctx = {
Provider: "whatsapp",
Surface: "whatsapp",
From: "whatsapp:+999",
SenderId: "wat",
SenderE164: "+123",
} as MsgContext;
const auth = resolveCommandAuthorization({
ctx,
cfg,
commandAuthorized: true,
});
expect(auth.senderId).toBe("+123");
expect(auth.isAuthorizedSender).toBe(true);
});
it("prefers SenderE164 when SenderId does not match allowFrom", () => {
const cfg = {
channels: { whatsapp: { allowFrom: ["+41796666864"] } },
} as OpenClawConfig;
const ctx = {
Provider: "whatsapp",
Surface: "whatsapp",
From: "whatsapp:120363401234567890@g.us",
SenderId: "123@lid",
SenderE164: "+41796666864",
} as MsgContext;
const auth = resolveCommandAuthorization({
ctx,
cfg,
commandAuthorized: true,
});
expect(auth.senderId).toBe("+41796666864");
expect(auth.senderId).toBe(expectedSenderId);
expect(auth.isAuthorizedSender).toBe(true);
});

View File

@@ -148,6 +148,27 @@ describe("createFollowupRunner compaction", () => {
});
describe("createFollowupRunner messaging tool dedupe", () => {
function createMessagingDedupeRunner(
onBlockReply: (payload: unknown) => Promise<void>,
overrides: Partial<{
sessionEntry: SessionEntry;
sessionStore: Record<string, SessionEntry>;
sessionKey: string;
storePath: string;
}> = {},
) {
return createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
sessionEntry: overrides.sessionEntry,
sessionStore: overrides.sessionStore,
sessionKey: overrides.sessionKey,
storePath: overrides.storePath,
});
}
it("drops payloads already sent via messaging tool", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
@@ -156,12 +177,7 @@ describe("createFollowupRunner messaging tool dedupe", () => {
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun());
@@ -176,12 +192,7 @@ describe("createFollowupRunner messaging tool dedupe", () => {
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun());
@@ -197,12 +208,7 @@ describe("createFollowupRunner messaging tool dedupe", () => {
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun("slack"));
@@ -217,12 +223,7 @@ describe("createFollowupRunner messaging tool dedupe", () => {
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun());
@@ -238,12 +239,7 @@ describe("createFollowupRunner messaging tool dedupe", () => {
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun());
@@ -275,15 +271,11 @@ describe("createFollowupRunner messaging tool dedupe", () => {
},
});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
const runner = createMessagingDedupeRunner(onBlockReply, {
sessionEntry,
sessionStore,
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
});
await runner(baseQueuedRun("slack"));

View File

@@ -1,5 +1,5 @@
import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js";
import { FOLLOWUP_QUEUES, getFollowupQueue } from "./state.js";
import { getExistingFollowupQueue, getFollowupQueue } from "./state.js";
import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js";
function isRunAlreadyQueued(
@@ -57,11 +57,7 @@ export function enqueueFollowupRun(
}
export function getFollowupQueueDepth(key: string): number {
const cleaned = key.trim();
if (!cleaned) {
return 0;
}
const queue = FOLLOWUP_QUEUES.get(cleaned);
const queue = getExistingFollowupQueue(key);
if (!queue) {
return 0;
}

View File

@@ -20,6 +20,14 @@ export const DEFAULT_QUEUE_DROP: QueueDropPolicy = "summarize";
export const FOLLOWUP_QUEUES = new Map<string, FollowupQueueState>();
export function getExistingFollowupQueue(key: string): FollowupQueueState | undefined {
const cleaned = key.trim();
if (!cleaned) {
return undefined;
}
return FOLLOWUP_QUEUES.get(cleaned);
}
export function getFollowupQueue(key: string, settings: QueueSettings): FollowupQueueState {
const existing = FOLLOWUP_QUEUES.get(key);
if (existing) {
@@ -57,10 +65,7 @@ export function getFollowupQueue(key: string, settings: QueueSettings): Followup
export function clearFollowupQueue(key: string): number {
const cleaned = key.trim();
if (!cleaned) {
return 0;
}
const queue = FOLLOWUP_QUEUES.get(cleaned);
const queue = getExistingFollowupQueue(cleaned);
if (!queue) {
return 0;
}

View File

@@ -0,0 +1,19 @@
import { resolveRuntimeGroupPolicy } from "./runtime-group-policy.js";
import type { GroupPolicy } from "./types.base.js";
export function resolveProviderRuntimeGroupPolicy(params: {
providerConfigPresent: boolean;
groupPolicy?: GroupPolicy;
defaultGroupPolicy?: GroupPolicy;
}): {
groupPolicy: GroupPolicy;
providerMissingFallbackApplied: boolean;
} {
return resolveRuntimeGroupPolicy({
providerConfigPresent: params.providerConfigPresent,
groupPolicy: params.groupPolicy,
defaultGroupPolicy: params.defaultGroupPolicy,
configuredFallbackPolicy: "open",
missingProviderFallbackPolicy: "allowlist",
});
}

View File

@@ -12,22 +12,20 @@ export const ExecApprovalsAllowlistEntrySchema = Type.Object(
{ additionalProperties: false },
);
export const ExecApprovalsDefaultsSchema = Type.Object(
{
security: Type.Optional(Type.String()),
ask: Type.Optional(Type.String()),
askFallback: Type.Optional(Type.String()),
autoAllowSkills: Type.Optional(Type.Boolean()),
},
{ additionalProperties: false },
);
const ExecApprovalsPolicyFields = {
security: Type.Optional(Type.String()),
ask: Type.Optional(Type.String()),
askFallback: Type.Optional(Type.String()),
autoAllowSkills: Type.Optional(Type.Boolean()),
};
export const ExecApprovalsDefaultsSchema = Type.Object(ExecApprovalsPolicyFields, {
additionalProperties: false,
});
export const ExecApprovalsAgentSchema = Type.Object(
{
security: Type.Optional(Type.String()),
ask: Type.Optional(Type.String()),
askFallback: Type.Optional(Type.String()),
autoAllowSkills: Type.Optional(Type.Boolean()),
...ExecApprovalsPolicyFields,
allowlist: Type.Optional(Type.Array(ExecApprovalsAllowlistEntrySchema)),
},
{ additionalProperties: false },

View File

@@ -622,19 +622,22 @@ export function attachGatewayWsMessageHandler(params: {
`security audit: device access upgrade requested reason=${reason} device=${device.id} ip=${reportedClientIp ?? "unknown-ip"} auth=${authMethod} roleFrom=${formatAuditList(currentRoles)} roleTo=${role} scopesFrom=${formatAuditList(currentScopes)} scopesTo=${formatAuditList(scopes)} client=${connectParams.client.id} conn=${connId}`,
);
};
const clientAccessMetadata = {
displayName: connectParams.client.displayName,
platform: connectParams.client.platform,
clientId: connectParams.client.id,
clientMode: connectParams.client.mode,
role,
scopes,
remoteIp: reportedClientIp,
};
const requirePairing = async (
reason: "not-paired" | "role-upgrade" | "scope-upgrade",
) => {
const pairing = await requestDevicePairing({
deviceId: device.id,
publicKey: devicePublicKey,
displayName: connectParams.client.displayName,
platform: connectParams.client.platform,
clientId: connectParams.client.id,
clientMode: connectParams.client.mode,
role,
scopes,
remoteIp: reportedClientIp,
...clientAccessMetadata,
silent: isLocalClient && reason === "not-paired",
});
const context = buildRequestContext();
@@ -735,15 +738,7 @@ export function attachGatewayWsMessageHandler(params: {
}
}
await updatePairedDeviceMetadata(device.id, {
displayName: connectParams.client.displayName,
platform: connectParams.client.platform,
clientId: connectParams.client.id,
clientMode: connectParams.client.mode,
role,
scopes,
remoteIp: reportedClientIp,
});
await updatePairedDeviceMetadata(device.id, clientAccessMetadata);
}
}

View File

@@ -46,6 +46,12 @@ describe("boot-md handler", () => {
return cfg;
}
function setupSingleMainAgentBootConfig(cfg: unknown) {
listAgentIds.mockReturnValue(["main"]);
resolveAgentWorkspaceDir.mockReturnValue(MAIN_WORKSPACE_DIR);
return cfg;
}
beforeEach(() => {
vi.clearAllMocks();
});
@@ -82,9 +88,7 @@ describe("boot-md handler", () => {
});
it("runs boot for single default agent when no agents configured", async () => {
const cfg = {};
listAgentIds.mockReturnValue(["main"]);
resolveAgentWorkspaceDir.mockReturnValue(MAIN_WORKSPACE_DIR);
const cfg = setupSingleMainAgentBootConfig({});
runBootOnce.mockResolvedValue({ status: "skipped", reason: "missing" });
await runBootChecklist(makeEvent({ context: { cfg } }));
@@ -112,9 +116,7 @@ describe("boot-md handler", () => {
});
it("logs debug details when a per-agent boot run is skipped", async () => {
const cfg = { agents: { list: [{ id: "main" }] } };
listAgentIds.mockReturnValue(["main"]);
resolveAgentWorkspaceDir.mockReturnValue(MAIN_WORKSPACE_DIR);
const cfg = setupSingleMainAgentBootConfig({ agents: { list: [{ id: "main" }] } });
runBootOnce.mockResolvedValue({ status: "skipped", reason: "missing" });
await runBootChecklist(makeEvent({ context: { cfg } }));

View File

@@ -133,6 +133,33 @@ async function createSessionMemoryWorkspace(params?: {
return { tempDir, sessionsDir, activeSessionFile };
}
async function loadMemoryFromActiveSessionPointer(params: {
tempDir: string;
activeSessionFile: string;
}): Promise<string> {
const { memoryContent } = await runNewWithPreviousSessionEntry({
tempDir: params.tempDir,
previousSessionEntry: {
sessionId: "test-123",
sessionFile: params.activeSessionFile,
},
});
return memoryContent;
}
function expectMemoryConversation(params: {
memoryContent: string;
user: string;
assistant: string;
absent?: string;
}) {
expect(params.memoryContent).toContain(`user: ${params.user}`);
expect(params.memoryContent).toContain(`assistant: ${params.assistant}`);
if (params.absent) {
expect(params.memoryContent).not.toContain(params.absent);
}
}
describe("session-memory hook", () => {
it("skips non-command events", async () => {
const tempDir = await makeTempWorkspace("openclaw-session-memory-");
@@ -415,17 +442,17 @@ describe("session-memory hook", () => {
]),
});
const { memoryContent } = await runNewWithPreviousSessionEntry({
const memoryContent = await loadMemoryFromActiveSessionPointer({
tempDir,
previousSessionEntry: {
sessionId: "test-123",
sessionFile: activeSessionFile!,
},
activeSessionFile: activeSessionFile!,
});
expect(memoryContent).toContain("user: Newest rotated transcript");
expect(memoryContent).toContain("assistant: Newest summary");
expect(memoryContent).not.toContain("Older rotated transcript");
expectMemoryConversation({
memoryContent,
user: "Newest rotated transcript",
assistant: "Newest summary",
absent: "Older rotated transcript",
});
});
it("prefers active transcript when it is non-empty even with reset candidates", async () => {
@@ -448,17 +475,17 @@ describe("session-memory hook", () => {
]),
});
const { memoryContent } = await runNewWithPreviousSessionEntry({
const memoryContent = await loadMemoryFromActiveSessionPointer({
tempDir,
previousSessionEntry: {
sessionId: "test-123",
sessionFile: activeSessionFile!,
},
activeSessionFile: activeSessionFile!,
});
expect(memoryContent).toContain("user: Active transcript message");
expect(memoryContent).toContain("assistant: Active transcript summary");
expect(memoryContent).not.toContain("Reset fallback message");
expectMemoryConversation({
memoryContent,
user: "Active transcript message",
assistant: "Active transcript summary",
absent: "Reset fallback message",
});
});
it("handles empty session files gracefully", async () => {

View File

@@ -33,6 +33,25 @@ describe("loader", () => {
process.env.OPENCLAW_BUNDLED_HOOKS_DIR = "/nonexistent/bundled/hooks";
});
async function writeHandlerModule(
fileName: string,
code = "export default async function() {}",
): Promise<string> {
const handlerPath = path.join(tmpDir, fileName);
await fs.writeFile(handlerPath, code, "utf-8");
return handlerPath;
}
function createEnabledHooksConfig(
handlers?: Array<{ event: string; module: string; export?: string }>,
): OpenClawConfig {
return {
hooks: {
internal: handlers ? { enabled: true, handlers } : { enabled: true },
},
};
}
afterEach(async () => {
clearInternalHooks();
envSnapshot.restore();
@@ -67,27 +86,18 @@ describe("loader", () => {
it("should load a handler from a module", async () => {
// Create a test handler module
const handlerPath = path.join(tmpDir, "test-handler.js");
const handlerCode = `
export default async function(event) {
// Test handler
}
`;
await fs.writeFile(handlerPath, handlerCode, "utf-8");
const cfg: OpenClawConfig = {
hooks: {
internal: {
enabled: true,
handlers: [
{
event: "command:new",
module: path.basename(handlerPath),
},
],
},
const handlerPath = await writeHandlerModule("test-handler.js", handlerCode);
const cfg = createEnabledHooksConfig([
{
event: "command:new",
module: path.basename(handlerPath),
},
};
]);
const count = await loadInternalHooks(cfg, tmpDir);
expect(count).toBe(1);
@@ -98,23 +108,13 @@ describe("loader", () => {
it("should load multiple handlers", async () => {
// Create test handler modules
const handler1Path = path.join(tmpDir, "handler1.js");
const handler2Path = path.join(tmpDir, "handler2.js");
const handler1Path = await writeHandlerModule("handler1.js");
const handler2Path = await writeHandlerModule("handler2.js");
await fs.writeFile(handler1Path, "export default async function() {}", "utf-8");
await fs.writeFile(handler2Path, "export default async function() {}", "utf-8");
const cfg: OpenClawConfig = {
hooks: {
internal: {
enabled: true,
handlers: [
{ event: "command:new", module: path.basename(handler1Path) },
{ event: "command:stop", module: path.basename(handler2Path) },
],
},
},
};
const cfg = createEnabledHooksConfig([
{ event: "command:new", module: path.basename(handler1Path) },
{ event: "command:stop", module: path.basename(handler2Path) },
]);
const count = await loadInternalHooks(cfg, tmpDir);
expect(count).toBe(2);
@@ -126,47 +126,32 @@ describe("loader", () => {
it("should support named exports", async () => {
// Create a handler module with named export
const handlerPath = path.join(tmpDir, "named-export.js");
const handlerCode = `
export const myHandler = async function(event) {
// Named export handler
}
`;
await fs.writeFile(handlerPath, handlerCode, "utf-8");
const handlerPath = await writeHandlerModule("named-export.js", handlerCode);
const cfg: OpenClawConfig = {
hooks: {
internal: {
enabled: true,
handlers: [
{
event: "command:new",
module: path.basename(handlerPath),
export: "myHandler",
},
],
},
const cfg = createEnabledHooksConfig([
{
event: "command:new",
module: path.basename(handlerPath),
export: "myHandler",
},
};
]);
const count = await loadInternalHooks(cfg, tmpDir);
expect(count).toBe(1);
});
it("should handle module loading errors gracefully", async () => {
const cfg: OpenClawConfig = {
hooks: {
internal: {
enabled: true,
handlers: [
{
event: "command:new",
module: "missing-handler.js",
},
],
},
const cfg = createEnabledHooksConfig([
{
event: "command:new",
module: "missing-handler.js",
},
};
]);
// Should not throw and should return 0 (handler failed to load)
const count = await loadInternalHooks(cfg, tmpDir);
@@ -175,22 +160,17 @@ describe("loader", () => {
it("should handle non-function exports", async () => {
// Create a module with a non-function export
const handlerPath = path.join(tmpDir, "bad-export.js");
await fs.writeFile(handlerPath, 'export default "not a function";', "utf-8");
const handlerPath = await writeHandlerModule(
"bad-export.js",
'export default "not a function";',
);
const cfg: OpenClawConfig = {
hooks: {
internal: {
enabled: true,
handlers: [
{
event: "command:new",
module: path.basename(handlerPath),
},
],
},
const cfg = createEnabledHooksConfig([
{
event: "command:new",
module: path.basename(handlerPath),
},
};
]);
// Should not throw and should return 0 (handler is not a function)
const count = await loadInternalHooks(cfg, tmpDir);
@@ -199,25 +179,17 @@ describe("loader", () => {
it("should handle relative paths", async () => {
// Create a handler module
const handlerPath = path.join(tmpDir, "relative-handler.js");
await fs.writeFile(handlerPath, "export default async function() {}", "utf-8");
const handlerPath = await writeHandlerModule("relative-handler.js");
// Relative to workspaceDir (tmpDir)
const relativePath = path.relative(tmpDir, handlerPath);
const cfg: OpenClawConfig = {
hooks: {
internal: {
enabled: true,
handlers: [
{
event: "command:new",
module: relativePath,
},
],
},
const cfg = createEnabledHooksConfig([
{
event: "command:new",
module: relativePath,
},
};
]);
const count = await loadInternalHooks(cfg, tmpDir);
expect(count).toBe(1);
@@ -225,7 +197,6 @@ describe("loader", () => {
it("should actually call the loaded handler", async () => {
// Create a handler that we can verify was called
const handlerPath = path.join(tmpDir, "callable-handler.js");
const handlerCode = `
let callCount = 0;
export default async function(event) {
@@ -235,21 +206,14 @@ describe("loader", () => {
return callCount;
}
`;
await fs.writeFile(handlerPath, handlerCode, "utf-8");
const handlerPath = await writeHandlerModule("callable-handler.js", handlerCode);
const cfg: OpenClawConfig = {
hooks: {
internal: {
enabled: true,
handlers: [
{
event: "command:new",
module: path.basename(handlerPath),
},
],
},
const cfg = createEnabledHooksConfig([
{
event: "command:new",
module: path.basename(handlerPath),
},
};
]);
await loadInternalHooks(cfg, tmpDir);
@@ -288,13 +252,7 @@ describe("loader", () => {
return;
}
const cfg: OpenClawConfig = {
hooks: {
internal: {
enabled: true,
},
},
};
const cfg = createEnabledHooksConfig();
const count = await loadInternalHooks(cfg, tmpDir);
expect(count).toBe(0);
@@ -312,19 +270,12 @@ describe("loader", () => {
return;
}
const cfg: OpenClawConfig = {
hooks: {
internal: {
enabled: true,
handlers: [
{
event: "command:new",
module: "legacy-handler.js",
},
],
},
const cfg = createEnabledHooksConfig([
{
event: "command:new",
module: "legacy-handler.js",
},
};
]);
const count = await loadInternalHooks(cfg, tmpDir);
expect(count).toBe(0);

View File

@@ -1,5 +1,10 @@
import type { AudioTranscriptionRequest, AudioTranscriptionResult } from "../../types.js";
import { assertOkOrThrowHttpError, fetchWithTimeoutGuarded, normalizeBaseUrl } from "../shared.js";
import {
assertOkOrThrowHttpError,
normalizeBaseUrl,
postTranscriptionRequest,
requireTranscriptionText,
} from "../shared.js";
export const DEFAULT_DEEPGRAM_AUDIO_BASE_URL = "https://api.deepgram.com/v1";
export const DEFAULT_DEEPGRAM_AUDIO_MODEL = "nova-3";
@@ -50,26 +55,23 @@ export async function transcribeDeepgramAudio(
}
const body = new Uint8Array(params.buffer);
const { response: res, release } = await fetchWithTimeoutGuarded(
url.toString(),
{
method: "POST",
headers,
body,
},
params.timeoutMs,
const { response: res, release } = await postTranscriptionRequest({
url: url.toString(),
headers,
body,
timeoutMs: params.timeoutMs,
fetchFn,
allowPrivate ? { ssrfPolicy: { allowPrivateNetwork: true } } : undefined,
);
allowPrivateNetwork: allowPrivate,
});
try {
await assertOkOrThrowHttpError(res, "Audio transcription failed");
const payload = (await res.json()) as DeepgramTranscriptResponse;
const transcript = payload.results?.channels?.[0]?.alternatives?.[0]?.transcript?.trim();
if (!transcript) {
throw new Error("Audio transcription response missing transcript");
}
const transcript = requireTranscriptionText(
payload.results?.channels?.[0]?.alternatives?.[0]?.transcript,
"Audio transcription response missing transcript",
);
return { text: transcript, model };
} finally {
await release();

View File

@@ -1,20 +1,11 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import * as ssrf from "../../../infra/net/ssrf.js";
import { withFetchPreconnect } from "../../../test-utils/fetch-mock.js";
import { createRequestCaptureJsonFetch } from "../audio.test-helpers.js";
import { describeGeminiVideo } from "./video.js";
const TEST_NET_IP = "203.0.113.10";
const resolveRequestUrl = (input: RequestInfo | URL) => {
if (typeof input === "string") {
return input;
}
if (input instanceof URL) {
return input.toString();
}
return input.url;
};
function stubPinnedHostname(hostname: string) {
const normalized = hostname.trim().toLowerCase().replace(/\.$/, "");
const addresses = [TEST_NET_IP];
@@ -73,23 +64,14 @@ describe("describeGeminiVideo", () => {
});
it("builds the expected request payload", async () => {
let seenUrl: string | null = null;
let seenInit: RequestInit | undefined;
const fetchFn = withFetchPreconnect(async (input: RequestInfo | URL, init?: RequestInit) => {
seenUrl = resolveRequestUrl(input);
seenInit = init;
return new Response(
JSON.stringify({
candidates: [
{
content: {
parts: [{ text: "first" }, { text: " second " }, { text: "" }],
},
},
],
}),
{ status: 200, headers: { "content-type": "application/json" } },
);
const { fetchFn, getRequest } = createRequestCaptureJsonFetch({
candidates: [
{
content: {
parts: [{ text: "first" }, { text: " second " }, { text: "" }],
},
},
],
});
const result = await describeGeminiVideo({
@@ -102,6 +84,7 @@ describe("describeGeminiVideo", () => {
headers: { "X-Other": "1" },
fetchFn,
});
const { url: seenUrl, init: seenInit } = getRequest();
expect(result.model).toBe("gemini-3-pro-preview");
expect(result.text).toBe("first\nsecond");

View File

@@ -1,6 +1,11 @@
import path from "node:path";
import type { AudioTranscriptionRequest, AudioTranscriptionResult } from "../../types.js";
import { assertOkOrThrowHttpError, fetchWithTimeoutGuarded, normalizeBaseUrl } from "../shared.js";
import {
assertOkOrThrowHttpError,
normalizeBaseUrl,
postTranscriptionRequest,
requireTranscriptionText,
} from "../shared.js";
export const DEFAULT_OPENAI_AUDIO_BASE_URL = "https://api.openai.com/v1";
const DEFAULT_OPENAI_AUDIO_MODEL = "gpt-4o-mini-transcribe";
@@ -39,26 +44,23 @@ export async function transcribeOpenAiCompatibleAudio(
headers.set("authorization", `Bearer ${params.apiKey}`);
}
const { response: res, release } = await fetchWithTimeoutGuarded(
const { response: res, release } = await postTranscriptionRequest({
url,
{
method: "POST",
headers,
body: form,
},
params.timeoutMs,
headers,
body: form,
timeoutMs: params.timeoutMs,
fetchFn,
allowPrivate ? { ssrfPolicy: { allowPrivateNetwork: true } } : undefined,
);
allowPrivateNetwork: allowPrivate,
});
try {
await assertOkOrThrowHttpError(res, "Audio transcription failed");
const payload = (await res.json()) as { text?: string };
const text = payload.text?.trim();
if (!text) {
throw new Error("Audio transcription response missing text");
}
const text = requireTranscriptionText(
payload.text,
"Audio transcription response missing text",
);
return { text, model };
} finally {
await release();

View File

@@ -32,6 +32,27 @@ export async function fetchWithTimeoutGuarded(
});
}
export async function postTranscriptionRequest(params: {
url: string;
headers: Headers;
body: BodyInit;
timeoutMs: number;
fetchFn: typeof fetch;
allowPrivateNetwork?: boolean;
}) {
return fetchWithTimeoutGuarded(
params.url,
{
method: "POST",
headers: params.headers,
body: params.body,
},
params.timeoutMs,
params.fetchFn,
params.allowPrivateNetwork ? { ssrfPolicy: { allowPrivateNetwork: true } } : undefined,
);
}
export async function readErrorResponse(res: Response): Promise<string | undefined> {
try {
const text = await res.text();
@@ -56,3 +77,14 @@ export async function assertOkOrThrowHttpError(res: Response, label: string): Pr
const suffix = detail ? `: ${detail}` : "";
throw new Error(`${label} (HTTP ${res.status})${suffix}`);
}
export function requireTranscriptionText(
value: string | undefined,
missingMessage: string,
): string {
const text = value?.trim();
if (!text) {
throw new Error(missingMessage);
}
return text;
}

View File

@@ -9,11 +9,11 @@ const { spawnWithFallbackMock, killProcessTreeMock } = vi.hoisted(() => ({
}));
vi.mock("../../spawn-utils.js", () => ({
spawnWithFallback: (...args: unknown[]) => spawnWithFallbackMock(...args),
spawnWithFallback: spawnWithFallbackMock,
}));
vi.mock("../../kill-tree.js", () => ({
killProcessTree: (...args: unknown[]) => killProcessTreeMock(...args),
killProcessTree: killProcessTreeMock,
}));
let createChildAdapter: typeof import("./child.js").createChildAdapter;

View File

@@ -31,6 +31,11 @@ function createStubPty(pid = 1234) {
};
}
function expectSpawnEnv() {
const spawnOptions = spawnMock.mock.calls[0]?.[2] as { env?: Record<string, string> };
return spawnOptions?.env;
}
describe("createPtyAdapter", () => {
let createPtyAdapter: typeof import("./pty.js").createPtyAdapter;
@@ -152,8 +157,7 @@ describe("createPtyAdapter", () => {
args: ["-lc", "env"],
});
const spawnOptions = spawnMock.mock.calls[0]?.[2] as { env?: Record<string, string> };
expect(spawnOptions?.env).toBeUndefined();
expect(expectSpawnEnv()).toBeUndefined();
});
it("passes explicit env overrides as strings", async () => {
@@ -166,8 +170,7 @@ describe("createPtyAdapter", () => {
env: { FOO: "bar", COUNT: "12", DROP_ME: undefined },
});
const spawnOptions = spawnMock.mock.calls[0]?.[2] as { env?: Record<string, string> };
expect(spawnOptions?.env).toEqual({ FOO: "bar", COUNT: "12" });
expect(expectSpawnEnv()).toEqual({ FOO: "bar", COUNT: "12" });
});
it("does not pass a signal to node-pty on Windows", async () => {

View File

@@ -1,19 +1,35 @@
import { describe, expect, it } from "vitest";
import { createRunRegistry } from "./registry.js";
type RunRegistry = ReturnType<typeof createRunRegistry>;
function addRunningRecord(
registry: RunRegistry,
params: {
runId: string;
sessionId: string;
startedAtMs: number;
scopeKey?: string;
backendId?: string;
},
) {
registry.add({
runId: params.runId,
sessionId: params.sessionId,
backendId: params.backendId ?? "b1",
scopeKey: params.scopeKey,
state: "running",
startedAtMs: params.startedAtMs,
lastOutputAtMs: params.startedAtMs,
createdAtMs: params.startedAtMs,
updatedAtMs: params.startedAtMs,
});
}
describe("process supervisor run registry", () => {
it("finalize is idempotent and preserves first terminal metadata", () => {
const registry = createRunRegistry();
registry.add({
runId: "r1",
sessionId: "s1",
backendId: "b1",
state: "running",
startedAtMs: 1,
lastOutputAtMs: 1,
createdAtMs: 1,
updatedAtMs: 1,
});
addRunningRecord(registry, { runId: "r1", sessionId: "s1", startedAtMs: 1 });
const first = registry.finalize("r1", {
reason: "overall-timeout",
@@ -41,36 +57,9 @@ describe("process supervisor run registry", () => {
it("prunes oldest exited records once retention cap is exceeded", () => {
const registry = createRunRegistry({ maxExitedRecords: 2 });
registry.add({
runId: "r1",
sessionId: "s1",
backendId: "b1",
state: "running",
startedAtMs: 1,
lastOutputAtMs: 1,
createdAtMs: 1,
updatedAtMs: 1,
});
registry.add({
runId: "r2",
sessionId: "s2",
backendId: "b1",
state: "running",
startedAtMs: 2,
lastOutputAtMs: 2,
createdAtMs: 2,
updatedAtMs: 2,
});
registry.add({
runId: "r3",
sessionId: "s3",
backendId: "b1",
state: "running",
startedAtMs: 3,
lastOutputAtMs: 3,
createdAtMs: 3,
updatedAtMs: 3,
});
addRunningRecord(registry, { runId: "r1", sessionId: "s1", startedAtMs: 1 });
addRunningRecord(registry, { runId: "r2", sessionId: "s2", startedAtMs: 2 });
addRunningRecord(registry, { runId: "r3", sessionId: "s3", startedAtMs: 3 });
registry.finalize("r1", { reason: "exit", exitCode: 0, exitSignal: null });
registry.finalize("r2", { reason: "exit", exitCode: 0, exitSignal: null });
@@ -80,4 +69,32 @@ describe("process supervisor run registry", () => {
expect(registry.get("r2")?.state).toBe("exited");
expect(registry.get("r3")?.state).toBe("exited");
});
it("filters listByScope and returns detached copies", () => {
const registry = createRunRegistry();
addRunningRecord(registry, {
runId: "r1",
sessionId: "s1",
scopeKey: "scope:a",
startedAtMs: 1,
});
addRunningRecord(registry, {
runId: "r2",
sessionId: "s2",
scopeKey: "scope:b",
startedAtMs: 2,
});
expect(registry.listByScope(" ")).toEqual([]);
const scoped = registry.listByScope("scope:a");
expect(scoped).toHaveLength(1);
const [firstScoped] = scoped;
expect(firstScoped?.runId).toBe("r1");
if (!firstScoped) {
throw new Error("missing scoped record");
}
firstScoped.state = "exited";
expect(registry.get("r1")?.state).toBe("running");
});
});

View File

@@ -1,13 +1,23 @@
import { describe, expect, it } from "vitest";
import { createProcessSupervisor } from "./supervisor.js";
type ProcessSupervisor = ReturnType<typeof createProcessSupervisor>;
type SpawnOptions = Parameters<ProcessSupervisor["spawn"]>[0];
type ChildSpawnOptions = Omit<Extract<SpawnOptions, { mode: "child" }>, "backendId" | "mode">;
async function spawnChild(supervisor: ProcessSupervisor, options: ChildSpawnOptions) {
return supervisor.spawn({
...options,
backendId: "test",
mode: "child",
});
}
describe("process supervisor", () => {
it("spawns child runs and captures output", async () => {
const supervisor = createProcessSupervisor();
const run = await supervisor.spawn({
const run = await spawnChild(supervisor, {
sessionId: "s1",
backendId: "test",
mode: "child",
argv: [process.execPath, "-e", 'process.stdout.write("ok")'],
timeoutMs: 2_500,
stdinMode: "pipe-closed",
@@ -20,10 +30,8 @@ describe("process supervisor", () => {
it("enforces no-output timeout for silent processes", async () => {
const supervisor = createProcessSupervisor();
const run = await supervisor.spawn({
const run = await spawnChild(supervisor, {
sessionId: "s1",
backendId: "test",
mode: "child",
argv: [process.execPath, "-e", "setTimeout(() => {}, 60)"],
timeoutMs: 1_000,
noOutputTimeoutMs: 20,
@@ -37,22 +45,18 @@ describe("process supervisor", () => {
it("cancels prior scoped run when replaceExistingScope is enabled", async () => {
const supervisor = createProcessSupervisor();
const first = await supervisor.spawn({
const first = await spawnChild(supervisor, {
sessionId: "s1",
backendId: "test",
scopeKey: "scope:a",
mode: "child",
argv: [process.execPath, "-e", "setTimeout(() => {}, 60)"],
timeoutMs: 1_000,
stdinMode: "pipe-open",
});
const second = await supervisor.spawn({
const second = await spawnChild(supervisor, {
sessionId: "s1",
backendId: "test",
scopeKey: "scope:a",
replaceExistingScope: true,
mode: "child",
argv: [process.execPath, "-e", 'process.stdout.write("new")'],
timeoutMs: 2_500,
stdinMode: "pipe-closed",
@@ -67,10 +71,8 @@ describe("process supervisor", () => {
it("applies overall timeout even for near-immediate timer firing", async () => {
const supervisor = createProcessSupervisor();
const run = await supervisor.spawn({
const run = await spawnChild(supervisor, {
sessionId: "s-timeout",
backendId: "test",
mode: "child",
argv: [process.execPath, "-e", "setTimeout(() => {}, 60)"],
timeoutMs: 1,
stdinMode: "pipe-closed",
@@ -83,10 +85,8 @@ describe("process supervisor", () => {
it("can stream output without retaining it in RunExit payload", async () => {
const supervisor = createProcessSupervisor();
let streamed = "";
const run = await supervisor.spawn({
const run = await spawnChild(supervisor, {
sessionId: "s-capture",
backendId: "test",
mode: "child",
argv: [process.execPath, "-e", 'process.stdout.write("streamed")'],
timeoutMs: 2_500,
stdinMode: "pipe-closed",

View File

@@ -99,6 +99,11 @@ type SlackModalEventBase = {
};
type SlackModalInteractionKind = "view_submission" | "view_closed";
type SlackModalEventHandlerArgs = { ack: () => Promise<void>; body: unknown };
type RegisterSlackModalHandler = (
matcher: RegExp,
handler: (args: SlackModalEventHandlerArgs) => Promise<void>,
) => void;
function readOptionValues(options: unknown): string[] | undefined {
if (!Array.isArray(options)) {
@@ -483,6 +488,24 @@ function emitSlackModalLifecycleEvent(params: {
});
}
function registerModalLifecycleHandler(params: {
register: RegisterSlackModalHandler;
matcher: RegExp;
ctx: SlackMonitorContext;
interactionType: SlackModalInteractionKind;
contextPrefix: "slack:interaction:view" | "slack:interaction:view-closed";
}) {
params.register(params.matcher, async ({ ack, body }: SlackModalEventHandlerArgs) => {
await ack();
emitSlackModalLifecycleEvent({
ctx: params.ctx,
body: body as SlackModalBody,
interactionType: params.interactionType,
contextPrefix: params.contextPrefix,
});
});
}
export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContext }) {
const { ctx } = params;
if (typeof ctx.app.action !== "function") {
@@ -646,27 +669,20 @@ export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContex
if (typeof ctx.app.view !== "function") {
return;
}
const modalMatcher = new RegExp(`^${OPENCLAW_ACTION_PREFIX}`);
// Handle OpenClaw modal submissions with callback_ids scoped by our prefix.
ctx.app.view(
new RegExp(`^${OPENCLAW_ACTION_PREFIX}`),
async ({ ack, body }: { ack: () => Promise<void>; body: unknown }) => {
await ack();
emitSlackModalLifecycleEvent({
ctx,
body: body as SlackModalBody,
interactionType: "view_submission",
contextPrefix: "slack:interaction:view",
});
},
);
registerModalLifecycleHandler({
register: (matcher, handler) => ctx.app.view(matcher, handler),
matcher: modalMatcher,
ctx,
interactionType: "view_submission",
contextPrefix: "slack:interaction:view",
});
const viewClosed = (
ctx.app as unknown as {
viewClosed?: (
matcher: RegExp,
handler: (args: { ack: () => Promise<void>; body: unknown }) => Promise<void>,
) => void;
viewClosed?: RegisterSlackModalHandler;
}
).viewClosed;
if (typeof viewClosed !== "function") {
@@ -674,16 +690,11 @@ export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContex
}
// Handle modal close events so agent workflows can react to cancelled forms.
viewClosed(
new RegExp(`^${OPENCLAW_ACTION_PREFIX}`),
async ({ ack, body }: { ack: () => Promise<void>; body: unknown }) => {
await ack();
emitSlackModalLifecycleEvent({
ctx,
body: body as SlackModalBody,
interactionType: "view_closed",
contextPrefix: "slack:interaction:view-closed",
});
},
);
registerModalLifecycleHandler({
register: viewClosed,
matcher: modalMatcher,
ctx,
interactionType: "view_closed",
contextPrefix: "slack:interaction:view-closed",
});
}

View File

@@ -292,17 +292,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
hasStreamedMessage = false;
}
const replyThreadTs = replyPlan.nextThreadTs();
await deliverReplies({
replies: [payload],
target: prepared.replyTarget,
token: ctx.botToken,
accountId: account.accountId,
runtime,
textLimit: ctx.textLimit,
replyThreadTs,
});
replyPlan.markSent();
await deliverNormally(payload);
},
onError: (err, info) => {
runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`));
@@ -362,6 +352,18 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
draftStream.update(trimmed);
hasStreamedMessage = true;
};
const onDraftBoundary =
useStreaming || !previewStreamingEnabled
? undefined
: async () => {
if (hasStreamedMessage) {
draftStream.forceNewMessage();
hasStreamedMessage = false;
appendRenderedText = "";
appendSourceText = "";
statusUpdateCount = 0;
}
};
const { queuedFinal, counts } = await dispatchInboundMessage({
ctx: prepared.ctxPayload,
@@ -384,32 +386,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
: async (payload) => {
updateDraftFromPartial(payload.text);
},
onAssistantMessageStart: useStreaming
? undefined
: !previewStreamingEnabled
? undefined
: async () => {
if (hasStreamedMessage) {
draftStream.forceNewMessage();
hasStreamedMessage = false;
appendRenderedText = "";
appendSourceText = "";
statusUpdateCount = 0;
}
},
onReasoningEnd: useStreaming
? undefined
: !previewStreamingEnabled
? undefined
: async () => {
if (hasStreamedMessage) {
draftStream.forceNewMessage();
hasStreamedMessage = false;
appendRenderedText = "";
appendSourceText = "";
statusUpdateCount = 0;
}
},
onAssistantMessageStart: onDraftBoundary,
onReasoningEnd: onDraftBoundary,
},
});
await draftStream.flush();

View File

@@ -168,6 +168,19 @@ describe("slack prepareSlackMessage inbound contract", () => {
};
}
function createThreadReplyMessage(overrides: Partial<SlackMessageEvent>): SlackMessageEvent {
return createSlackMessage({
channel: "C123",
channel_type: "channel",
thread_ts: "100.000",
...overrides,
});
}
function prepareThreadMessage(ctx: SlackMonitorContext, overrides: Partial<SlackMessageEvent>) {
return prepareMessageWith(ctx, createThreadAccount(), createThreadReplyMessage(overrides));
}
it("produces a finalized MsgContext", async () => {
const message: SlackMessageEvent = {
channel: "D123",
@@ -298,17 +311,10 @@ describe("slack prepareSlackMessage inbound contract", () => {
});
slackCtx.resolveChannelName = async () => ({ name: "general", type: "channel" });
const prepared = await prepareMessageWith(
slackCtx,
createThreadAccount(),
createSlackMessage({
channel: "C123",
channel_type: "channel",
text: "current message",
ts: "101.000",
thread_ts: "100.000",
}),
);
const prepared = await prepareThreadMessage(slackCtx, {
text: "current message",
ts: "101.000",
});
expect(prepared).toBeTruthy();
expect(prepared!.ctxPayload.IsFirstThreadTurn).toBe(true);
@@ -347,17 +353,11 @@ describe("slack prepareSlackMessage inbound contract", () => {
slackCtx.resolveUserName = async () => ({ name: "Alice" });
slackCtx.resolveChannelName = async () => ({ name: "general", type: "channel" });
const prepared = await prepareMessageWith(
slackCtx,
createThreadAccount(),
createSlackMessage({
channel: "C123",
channel_type: "channel",
text: "reply in old thread",
ts: "201.000",
thread_ts: "200.000",
}),
);
const prepared = await prepareThreadMessage(slackCtx, {
text: "reply in old thread",
ts: "201.000",
thread_ts: "200.000",
});
expect(prepared).toBeTruthy();
expect(prepared!.ctxPayload.IsFirstThreadTurn).toBeUndefined();

View File

@@ -147,6 +147,13 @@ function parseSlackCommandArgValue(raw?: string | null): {
};
}
function buildSlackArgMenuOptions(choices: EncodedMenuChoice[]) {
return choices.map((choice) => ({
text: { type: "plain_text", text: choice.label.slice(0, 75) },
value: choice.value,
}));
}
function buildSlackCommandArgMenuBlocks(params: {
title: string;
command: string;
@@ -185,10 +192,7 @@ function buildSlackCommandArgMenuBlocks(params: {
type: "overflow",
action_id: SLACK_COMMAND_ARG_ACTION_ID,
confirm: buildSlackArgMenuConfirm({ command: params.command, arg: params.arg }),
options: encodedChoices.map((choice) => ({
text: { type: "plain_text", text: choice.label.slice(0, 75) },
value: choice.value,
})),
options: buildSlackArgMenuOptions(encodedChoices),
},
],
},
@@ -238,10 +242,7 @@ function buildSlackCommandArgMenuBlocks(params: {
text:
index === 0 ? `Choose ${params.arg}` : `Choose ${params.arg} (${index + 1})`,
},
options: choices.map((choice) => ({
text: { type: "plain_text", text: choice.label.slice(0, 75) },
value: choice.value,
})),
options: buildSlackArgMenuOptions(choices),
},
],
}),

View File

@@ -82,6 +82,19 @@ function setupTransientGetFileRetry() {
return getFile;
}
function createFileTooBigError(): Error {
return new Error("GrammyError: Call to 'getFile' failed! (400: Bad Request: file is too big)");
}
async function expectTransientGetFileRetrySuccess() {
const getFile = setupTransientGetFileRetry();
const promise = resolveMedia(makeCtx("voice", getFile), MAX_MEDIA_BYTES, BOT_TOKEN);
await flushRetryTimers();
const result = await promise;
expect(getFile).toHaveBeenCalledTimes(2);
return result;
}
async function flushRetryTimers() {
await vi.runAllTimersAsync();
}
@@ -98,12 +111,7 @@ describe("resolveMedia getFile retry", () => {
});
it("retries getFile on transient failure and succeeds on second attempt", async () => {
const getFile = setupTransientGetFileRetry();
const promise = resolveMedia(makeCtx("voice", getFile), MAX_MEDIA_BYTES, BOT_TOKEN);
await flushRetryTimers();
const result = await promise;
expect(getFile).toHaveBeenCalledTimes(2);
const result = await expectTransientGetFileRetrySuccess();
expect(result).toEqual(
expect.objectContaining({ path: "/tmp/file_0.oga", placeholder: "<media:audio>" }),
);
@@ -135,15 +143,13 @@ describe("resolveMedia getFile retry", () => {
});
it("does not retry 'file is too big' error (400 Bad Request) and returns null", async () => {
// Simulate Telegram Bot API error when file exceeds 20MB limit
const fileTooBigError = new Error(
"GrammyError: Call to 'getFile' failed! (400: Bad Request: file is too big)",
);
// Simulate Telegram Bot API error when file exceeds 20MB limit.
const fileTooBigError = createFileTooBigError();
const getFile = vi.fn().mockRejectedValue(fileTooBigError);
const result = await resolveMedia(makeCtx("video", getFile), MAX_MEDIA_BYTES, BOT_TOKEN);
// Should NOT retry - "file is too big" is a permanent error, not transient
// Should NOT retry - "file is too big" is a permanent error, not transient.
expect(getFile).toHaveBeenCalledTimes(1);
expect(result).toBeNull();
});
@@ -166,10 +172,7 @@ describe("resolveMedia getFile retry", () => {
it.each(["audio", "voice"] as const)(
"returns null for %s when file is too big",
async (mediaField) => {
const fileTooBigError = new Error(
"GrammyError: Call to 'getFile' failed! (400: Bad Request: file is too big)",
);
const getFile = vi.fn().mockRejectedValue(fileTooBigError);
const getFile = vi.fn().mockRejectedValue(createFileTooBigError());
const result = await resolveMedia(makeCtx(mediaField, getFile), MAX_MEDIA_BYTES, BOT_TOKEN);
@@ -178,14 +181,17 @@ describe("resolveMedia getFile retry", () => {
},
);
it("still retries transient errors even after encountering file too big in different call", async () => {
const getFile = setupTransientGetFileRetry();
const promise = resolveMedia(makeCtx("voice", getFile), MAX_MEDIA_BYTES, BOT_TOKEN);
await flushRetryTimers();
const result = await promise;
it("throws when getFile returns no file_path", async () => {
const getFile = vi.fn().mockResolvedValue({});
await expect(
resolveMedia(makeCtx("voice", getFile), MAX_MEDIA_BYTES, BOT_TOKEN),
).rejects.toThrow("Telegram getFile returned no file_path");
expect(getFile).toHaveBeenCalledTimes(1);
});
// Should retry transient errors
expect(getFile).toHaveBeenCalledTimes(2);
it("still retries transient errors even after encountering file too big in different call", async () => {
const result = await expectTransientGetFileRetrySuccess();
// Should retry transient errors.
expect(result).not.toBeNull();
});
});

View File

@@ -71,6 +71,25 @@ function createSendMessageHarness(messageId = 4) {
return { runtime, sendMessage, bot };
}
function createVoiceMessagesForbiddenError() {
return new Error(
"GrammyError: Call to 'sendVoice' failed! (400: Bad Request: VOICE_MESSAGES_FORBIDDEN)",
);
}
function createVoiceFailureHarness(params: {
voiceError: Error;
sendMessageResult?: { message_id: number; chat: { id: string } };
}) {
const runtime = createRuntime();
const sendVoice = vi.fn().mockRejectedValue(params.voiceError);
const sendMessage = params.sendMessageResult
? vi.fn().mockResolvedValue(params.sendMessageResult)
: vi.fn();
const bot = createBot({ sendVoice, sendMessage });
return { runtime, sendVoice, sendMessage, bot };
}
describe("deliverReplies", () => {
beforeEach(() => {
loadWebMedia.mockClear();
@@ -258,19 +277,13 @@ describe("deliverReplies", () => {
});
it("falls back to text when sendVoice fails with VOICE_MESSAGES_FORBIDDEN", async () => {
const runtime = createRuntime();
const sendVoice = vi
.fn()
.mockRejectedValue(
new Error(
"GrammyError: Call to 'sendVoice' failed! (400: Bad Request: VOICE_MESSAGES_FORBIDDEN)",
),
);
const sendMessage = vi.fn().mockResolvedValue({
message_id: 5,
chat: { id: "123" },
const { runtime, sendVoice, sendMessage, bot } = createVoiceFailureHarness({
voiceError: createVoiceMessagesForbiddenError(),
sendMessageResult: {
message_id: 5,
chat: { id: "123" },
},
});
const bot = createBot({ sendVoice, sendMessage });
mockMediaLoad("note.ogg", "audio/ogg", "voice");
@@ -315,16 +328,9 @@ describe("deliverReplies", () => {
});
it("rethrows VOICE_MESSAGES_FORBIDDEN when no text fallback is available", async () => {
const runtime = createRuntime();
const sendVoice = vi
.fn()
.mockRejectedValue(
new Error(
"GrammyError: Call to 'sendVoice' failed! (400: Bad Request: VOICE_MESSAGES_FORBIDDEN)",
),
);
const sendMessage = vi.fn();
const bot = createBot({ sendVoice, sendMessage });
const { runtime, sendVoice, sendMessage, bot } = createVoiceFailureHarness({
voiceError: createVoiceMessagesForbiddenError(),
});
mockMediaLoad("note.ogg", "audio/ogg", "voice");

View File

@@ -52,6 +52,18 @@ function mockFirstSendMediaFailure(msg: WebInboundMsg, message: string) {
).mockRejectedValueOnce(new Error(message));
}
function mockFirstReplyFailure(msg: WebInboundMsg, message: string) {
(msg.reply as unknown as { mockRejectedValueOnce: (v: unknown) => void }).mockRejectedValueOnce(
new Error(message),
);
}
function mockSecondReplySuccess(msg: WebInboundMsg) {
(msg.reply as unknown as { mockResolvedValueOnce: (v: unknown) => void }).mockResolvedValueOnce(
undefined,
);
}
const replyLogger = {
info: vi.fn(),
warn: vi.fn(),
@@ -76,60 +88,31 @@ describe("deliverWebReply", () => {
expect(replyLogger.info).toHaveBeenCalledWith(expect.any(Object), "auto-reply sent (text)");
});
it("retries text send on transient failure", async () => {
const msg = makeMsg();
(msg.reply as unknown as { mockRejectedValueOnce: (v: unknown) => void }).mockRejectedValueOnce(
new Error("connection closed"),
);
(msg.reply as unknown as { mockResolvedValueOnce: (v: unknown) => void }).mockResolvedValueOnce(
undefined,
);
it.each(["connection closed", "operation timed out"])(
"retries text send on transient failure: %s",
async (errorMessage) => {
const msg = makeMsg();
mockFirstReplyFailure(msg, errorMessage);
mockSecondReplySuccess(msg);
await deliverWebReply({
replyResult: { text: "hi" },
msg,
maxMediaBytes: 1024 * 1024,
textLimit: 200,
replyLogger,
skipLog: true,
});
await deliverWebReply({
replyResult: { text: "hi" },
msg,
maxMediaBytes: 1024 * 1024,
textLimit: 200,
replyLogger,
skipLog: true,
});
expect(msg.reply).toHaveBeenCalledTimes(2);
expect(sleep).toHaveBeenCalledWith(500);
});
it("retries text send when error contains timed out", async () => {
const msg = makeMsg();
(msg.reply as unknown as { mockRejectedValueOnce: (v: unknown) => void }).mockRejectedValueOnce(
new Error("operation timed out"),
);
(msg.reply as unknown as { mockResolvedValueOnce: (v: unknown) => void }).mockResolvedValueOnce(
undefined,
);
await deliverWebReply({
replyResult: { text: "hi" },
msg,
maxMediaBytes: 1024 * 1024,
textLimit: 200,
replyLogger,
skipLog: true,
});
expect(msg.reply).toHaveBeenCalledTimes(2);
expect(sleep).toHaveBeenCalledWith(500);
});
expect(msg.reply).toHaveBeenCalledTimes(2);
expect(sleep).toHaveBeenCalledWith(500);
},
);
it("sends image media with caption and then remaining text", async () => {
const msg = makeMsg();
const mediaLocalRoots = ["/tmp/workspace-work"];
(
loadWebMedia as unknown as { mockResolvedValueOnce: (v: unknown) => void }
).mockResolvedValueOnce({
buffer: Buffer.from("img"),
contentType: "image/jpeg",
kind: "image",
});
mockLoadedImageMedia();
await deliverWebReply({
replyResult: { text: "aaaaaa", mediaUrl: "http://example.com/img.jpg" },

View File

@@ -95,6 +95,13 @@ describe("runWebHeartbeatOnce", () => {
let replyResolver: typeof getReplyFromConfig;
const getModules = async () => await import("./heartbeat-runner.js");
const buildRunArgs = (overrides: Record<string, unknown> = {}) => ({
cfg: { agents: { defaults: {} }, session: {} } as never,
to: "+123",
sender,
replyResolver,
...overrides,
});
beforeEach(() => {
state.visibility = { showAlerts: true, showOk: true, useIndicator: false };
@@ -117,26 +124,14 @@ describe("runWebHeartbeatOnce", () => {
it("supports manual override body dry-run without sending", async () => {
const { runWebHeartbeatOnce } = await getModules();
await runWebHeartbeatOnce({
cfg: { agents: { defaults: {} }, session: {} } as never,
to: "+123",
sender,
replyResolver,
overrideBody: "hello",
dryRun: true,
});
await runWebHeartbeatOnce(buildRunArgs({ overrideBody: "hello", dryRun: true }));
expect(senderMock).not.toHaveBeenCalled();
expect(state.events).toHaveLength(0);
});
it("sends HEARTBEAT_OK when reply is empty and showOk is enabled", async () => {
const { runWebHeartbeatOnce } = await getModules();
await runWebHeartbeatOnce({
cfg: { agents: { defaults: {} }, session: {} } as never,
to: "+123",
sender,
replyResolver,
});
await runWebHeartbeatOnce(buildRunArgs());
expect(senderMock).toHaveBeenCalledWith("+123", HEARTBEAT_TOKEN, { verbose: false });
expect(state.events).toEqual(
expect.arrayContaining([expect.objectContaining({ status: "ok-empty", silent: false })]),
@@ -145,13 +140,12 @@ describe("runWebHeartbeatOnce", () => {
it("injects a cron-style Current time line into the heartbeat prompt", async () => {
const { runWebHeartbeatOnce } = await getModules();
await runWebHeartbeatOnce({
cfg: { agents: { defaults: { heartbeat: { prompt: "Ops check" } } }, session: {} } as never,
to: "+123",
sender,
replyResolver,
dryRun: true,
});
await runWebHeartbeatOnce(
buildRunArgs({
cfg: { agents: { defaults: { heartbeat: { prompt: "Ops check" } } }, session: {} } as never,
dryRun: true,
}),
);
expect(replyResolver).toHaveBeenCalledTimes(1);
const ctx = replyResolverMock.mock.calls[0]?.[0];
expect(ctx?.Body).toContain("Ops check");
@@ -161,12 +155,7 @@ describe("runWebHeartbeatOnce", () => {
it("treats heartbeat token-only replies as ok-token and preserves session updatedAt", async () => {
replyResolverMock.mockResolvedValue({ text: HEARTBEAT_TOKEN });
const { runWebHeartbeatOnce } = await getModules();
await runWebHeartbeatOnce({
cfg: { agents: { defaults: {} }, session: {} } as never,
to: "+123",
sender,
replyResolver,
});
await runWebHeartbeatOnce(buildRunArgs());
expect(state.store.k?.updatedAt).toBe(123);
expect(senderMock).toHaveBeenCalledWith("+123", HEARTBEAT_TOKEN, { verbose: false });
expect(state.events).toEqual(
@@ -178,12 +167,7 @@ describe("runWebHeartbeatOnce", () => {
state.visibility = { showAlerts: false, showOk: true, useIndicator: true };
replyResolverMock.mockResolvedValue({ text: "ALERT" });
const { runWebHeartbeatOnce } = await getModules();
await runWebHeartbeatOnce({
cfg: { agents: { defaults: {} }, session: {} } as never,
to: "+123",
sender,
replyResolver,
});
await runWebHeartbeatOnce(buildRunArgs());
expect(senderMock).not.toHaveBeenCalled();
expect(state.events).toEqual(
expect.arrayContaining([
@@ -196,14 +180,7 @@ describe("runWebHeartbeatOnce", () => {
replyResolverMock.mockResolvedValue({ text: "ALERT" });
senderMock.mockRejectedValueOnce(new Error("nope"));
const { runWebHeartbeatOnce } = await getModules();
await expect(
runWebHeartbeatOnce({
cfg: { agents: { defaults: {} }, session: {} } as never,
to: "+123",
sender,
replyResolver,
}),
).rejects.toThrow("nope");
await expect(runWebHeartbeatOnce(buildRunArgs())).rejects.toThrow("nope");
expect(state.events).toEqual(
expect.arrayContaining([
expect.objectContaining({ status: "failed", reason: "ERR:Error: nope" }),

View File

@@ -19,6 +19,22 @@ export type GroupHistoryEntry = {
senderJid?: string;
};
type ApplyGroupGatingParams = {
cfg: ReturnType<typeof loadConfig>;
msg: WebInboundMsg;
conversationId: string;
groupHistoryKey: string;
agentId: string;
sessionKey: string;
baseMentionConfig: MentionConfig;
authDir?: string;
groupHistories: Map<string, GroupHistoryEntry[]>;
groupHistoryLimit: number;
groupMemberNames: Map<string, Map<string, string>>;
logVerbose: (msg: string) => void;
replyLogger: { debug: (obj: unknown, msg: string) => void };
};
function isOwnerSender(baseMentionConfig: MentionConfig, msg: WebInboundMsg) {
const sender = normalizeE164(msg.senderE164 ?? "");
if (!sender) {
@@ -52,21 +68,18 @@ function recordPendingGroupHistoryEntry(params: {
});
}
export function applyGroupGating(params: {
cfg: ReturnType<typeof loadConfig>;
msg: WebInboundMsg;
conversationId: string;
groupHistoryKey: string;
agentId: string;
sessionKey: string;
baseMentionConfig: MentionConfig;
authDir?: string;
groupHistories: Map<string, GroupHistoryEntry[]>;
groupHistoryLimit: number;
groupMemberNames: Map<string, Map<string, string>>;
logVerbose: (msg: string) => void;
replyLogger: { debug: (obj: unknown, msg: string) => void };
}) {
function skipGroupMessageAndStoreHistory(params: ApplyGroupGatingParams, verboseMessage: string) {
params.logVerbose(verboseMessage);
recordPendingGroupHistoryEntry({
msg: params.msg,
groupHistories: params.groupHistories,
groupHistoryKey: params.groupHistoryKey,
groupHistoryLimit: params.groupHistoryLimit,
});
return { shouldProcess: false } as const;
}
export function applyGroupGating(params: ApplyGroupGatingParams) {
const groupPolicy = resolveGroupPolicyFor(params.cfg, params.conversationId);
if (groupPolicy.allowlistEnabled && !groupPolicy.allowed) {
params.logVerbose(`Skipping group message ${params.conversationId} (not in allowlist)`);
@@ -91,14 +104,10 @@ export function applyGroupGating(params: {
const shouldBypassMention = owner && hasControlCommand(commandBody, params.cfg);
if (activationCommand.hasCommand && !owner) {
params.logVerbose(`Ignoring /activation from non-owner in group ${params.conversationId}`);
recordPendingGroupHistoryEntry({
msg: params.msg,
groupHistories: params.groupHistories,
groupHistoryKey: params.groupHistoryKey,
groupHistoryLimit: params.groupHistoryLimit,
});
return { shouldProcess: false };
return skipGroupMessageAndStoreHistory(
params,
`Ignoring /activation from non-owner in group ${params.conversationId}`,
);
}
const mentionDebug = debugMention(params.msg, mentionConfig, params.authDir);
@@ -137,16 +146,10 @@ export function applyGroupGating(params: {
});
params.msg.wasMentioned = mentionGate.effectiveWasMentioned;
if (!shouldBypassMention && requireMention && mentionGate.shouldSkip) {
params.logVerbose(
return skipGroupMessageAndStoreHistory(
params,
`Group message stored for context (no mention detected) in ${params.conversationId}: ${params.msg.body}`,
);
recordPendingGroupHistoryEntry({
msg: params.msg,
groupHistories: params.groupHistories,
groupHistoryKey: params.groupHistoryKey,
groupHistoryLimit: params.groupHistoryLimit,
});
return { shouldProcess: false };
}
return { shouldProcess: true };

View File

@@ -0,0 +1,56 @@
import { describe, expect, it } from "vitest";
import { formatGroupMembers, noteGroupMember } from "./group-members.js";
describe("noteGroupMember", () => {
it("normalizes member phone numbers before storing", () => {
const groupMemberNames = new Map<string, Map<string, string>>();
noteGroupMember(groupMemberNames, "g1", "+1 (555) 123-4567", "Alice");
expect(groupMemberNames.get("g1")?.get("+15551234567")).toBe("Alice");
});
it("ignores incomplete member values", () => {
const groupMemberNames = new Map<string, Map<string, string>>();
noteGroupMember(groupMemberNames, "g1", undefined, "Alice");
noteGroupMember(groupMemberNames, "g1", "+15551234567", undefined);
expect(groupMemberNames.get("g1")).toBeUndefined();
});
});
describe("formatGroupMembers", () => {
it("deduplicates participants and appends named roster members", () => {
const roster = new Map<string, string>([
["+16660000000", "Bob"],
["+17770000000", "Carol"],
]);
const formatted = formatGroupMembers({
participants: ["+1 (555) 000-0000", "+15550000000", "+16660000000"],
roster,
});
expect(formatted).toBe("+15550000000, Bob (+16660000000), Carol (+17770000000)");
});
it("falls back to sender when no participants or roster are available", () => {
const formatted = formatGroupMembers({
participants: [],
roster: undefined,
fallbackE164: "+1 (555) 222-3333",
});
expect(formatted).toBe("+15552223333");
});
it("returns undefined when no members can be resolved", () => {
expect(
formatGroupMembers({
participants: [],
roster: undefined,
}),
).toBeUndefined();
});
});

View File

@@ -1,5 +1,16 @@
import { normalizeE164 } from "../../../utils.js";
function appendNormalizedUnique(entries: Iterable<string>, seen: Set<string>, ordered: string[]) {
for (const entry of entries) {
const normalized = normalizeE164(entry) ?? entry;
if (!normalized || seen.has(normalized)) {
continue;
}
seen.add(normalized);
ordered.push(normalized);
}
}
export function noteGroupMember(
groupMemberNames: Map<string, Map<string, string>>,
conversationId: string,
@@ -31,27 +42,10 @@ export function formatGroupMembers(params: {
const seen = new Set<string>();
const ordered: string[] = [];
if (participants?.length) {
for (const entry of participants) {
if (!entry) {
continue;
}
const normalized = normalizeE164(entry) ?? entry;
if (!normalized || seen.has(normalized)) {
continue;
}
seen.add(normalized);
ordered.push(normalized);
}
appendNormalizedUnique(participants, seen, ordered);
}
if (roster) {
for (const entry of roster.keys()) {
const normalized = normalizeE164(entry) ?? entry;
if (!normalized || seen.has(normalized)) {
continue;
}
seen.add(normalized);
ordered.push(normalized);
}
appendNormalizedUnique(roster.keys(), seen, ordered);
}
if (ordered.length === 0 && fallbackE164) {
const normalized = normalizeE164(fallbackE164) ?? fallbackE164;

View File

@@ -83,6 +83,17 @@ function createGroupMessage(overrides: Record<string, unknown> = {}) {
};
}
function makeOwnerGroupConfig() {
return makeConfig({
channels: {
whatsapp: {
allowFrom: ["+111"],
groups: { "*": { requireMention: true } },
},
},
});
}
function makeInboundCfg(messagePrefix = "") {
return {
agents: { defaults: { workspace: "/tmp/openclaw" } },
@@ -114,21 +125,15 @@ describe("applyGroupGating", () => {
expect(result.shouldProcess).toBe(true);
});
it("bypasses mention gating for owner /new in group chats", () => {
const cfg = makeConfig({
channels: {
whatsapp: {
allowFrom: ["+111"],
groups: { "*": { requireMention: true } },
},
},
});
it.each([
{ id: "g-new", command: "/new" },
{ id: "g-status", command: "/status" },
])("bypasses mention gating for owner $command in group chats", ({ id, command }) => {
const { result } = runGroupGating({
cfg,
cfg: makeOwnerGroupConfig(),
msg: createGroupMessage({
id: "g-new",
body: "/new",
id,
body: command,
senderE164: "+111",
senderName: "Owner",
}),
@@ -161,29 +166,6 @@ describe("applyGroupGating", () => {
expect(groupHistories.get("whatsapp:default:group:123@g.us")?.length).toBe(1);
});
it("bypasses mention gating for owner /status in group chats", () => {
const cfg = makeConfig({
channels: {
whatsapp: {
allowFrom: ["+111"],
groups: { "*": { requireMention: true } },
},
},
});
const { result } = runGroupGating({
cfg,
msg: createGroupMessage({
id: "g-status",
body: "/status",
senderE164: "+111",
senderName: "Owner",
}),
});
expect(result.shouldProcess).toBe(true);
});
it("uses per-agent mention patterns for group gating (routing + mentionPatterns)", () => {
const cfg = makeConfig({
channels: {

View File

@@ -17,6 +17,12 @@ const mockSock = {
logger: { child: () => ({}) },
} as never;
async function expectMimetype(message: Record<string, unknown>, expected: string) {
const result = await downloadInboundMedia({ message } as never, mockSock);
expect(result).toBeDefined();
expect(result?.mimetype).toBe(expected);
}
describe("downloadInboundMedia", () => {
it("returns undefined for messages without media", async () => {
const msg = { message: { conversation: "hello" } } as never;
@@ -25,66 +31,26 @@ describe("downloadInboundMedia", () => {
});
it("uses explicit mimetype from audioMessage when present", async () => {
const msg = {
message: { audioMessage: { mimetype: "audio/mp4", ptt: true } },
} as never;
const result = await downloadInboundMedia(msg, mockSock);
expect(result).toBeDefined();
expect(result?.mimetype).toBe("audio/mp4");
await expectMimetype({ audioMessage: { mimetype: "audio/mp4", ptt: true } }, "audio/mp4");
});
it("defaults to audio/ogg for voice messages without explicit MIME", async () => {
const msg = {
message: { audioMessage: { ptt: true } },
} as never;
const result = await downloadInboundMedia(msg, mockSock);
expect(result).toBeDefined();
expect(result?.mimetype).toBe("audio/ogg; codecs=opus");
});
it("defaults to audio/ogg for audio messages without MIME or ptt flag", async () => {
const msg = {
message: { audioMessage: {} },
} as never;
const result = await downloadInboundMedia(msg, mockSock);
expect(result).toBeDefined();
expect(result?.mimetype).toBe("audio/ogg; codecs=opus");
it.each([
{ name: "voice messages without explicit MIME", audioMessage: { ptt: true } },
{ name: "audio messages without MIME or ptt flag", audioMessage: {} },
])("defaults to audio/ogg for $name", async ({ audioMessage }) => {
await expectMimetype({ audioMessage }, "audio/ogg; codecs=opus");
});
it("uses explicit mimetype from imageMessage when present", async () => {
const msg = {
message: { imageMessage: { mimetype: "image/png" } },
} as never;
const result = await downloadInboundMedia(msg, mockSock);
expect(result).toBeDefined();
expect(result?.mimetype).toBe("image/png");
await expectMimetype({ imageMessage: { mimetype: "image/png" } }, "image/png");
});
it("defaults to image/jpeg for images without explicit MIME", async () => {
const msg = {
message: { imageMessage: {} },
} as never;
const result = await downloadInboundMedia(msg, mockSock);
expect(result).toBeDefined();
expect(result?.mimetype).toBe("image/jpeg");
});
it("defaults to video/mp4 for video messages without explicit MIME", async () => {
const msg = {
message: { videoMessage: {} },
} as never;
const result = await downloadInboundMedia(msg, mockSock);
expect(result).toBeDefined();
expect(result?.mimetype).toBe("video/mp4");
});
it("defaults to image/webp for sticker messages without explicit MIME", async () => {
const msg = {
message: { stickerMessage: {} },
} as never;
const result = await downloadInboundMedia(msg, mockSock);
expect(result).toBeDefined();
expect(result?.mimetype).toBe("image/webp");
it.each([
{ name: "image", message: { imageMessage: {} }, mimetype: "image/jpeg" },
{ name: "video", message: { videoMessage: {} }, mimetype: "video/mp4" },
{ name: "sticker", message: { stickerMessage: {} }, mimetype: "image/webp" },
])("defaults MIME for $name messages without explicit MIME", async ({ message, mimetype }) => {
await expectMimetype(message, mimetype);
});
it("preserves fileName from document messages", async () => {