From 58309fd8d90d69e48754ce8795218dab4c129f27 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 24 Feb 2026 23:37:45 +0000 Subject: [PATCH] refactor(matrix,tests): extract helpers and inject send-queue timing --- .../matrix/src/matrix/monitor/events.ts | 49 ++++++++++--------- .../matrix/src/matrix/send-queue.test.ts | 47 ++++++++++++++---- extensions/matrix/src/matrix/send-queue.ts | 17 +++++-- src/agents/sandbox/fs-bridge.test.ts | 34 +++++++++---- .../monitor/message-handler.process.test.ts | 7 ++- 5 files changed, 108 insertions(+), 46 deletions(-) diff --git a/extensions/matrix/src/matrix/monitor/events.ts b/extensions/matrix/src/matrix/monitor/events.ts index 1f64f9558..279517d52 100644 --- a/extensions/matrix/src/matrix/monitor/events.ts +++ b/extensions/matrix/src/matrix/monitor/events.ts @@ -5,6 +5,32 @@ import { sendReadReceiptMatrix } from "../send.js"; import type { MatrixRawEvent } from "./types.js"; import { EventType } from "./types.js"; +function createSelfUserIdResolver(client: Pick) { + let selfUserId: string | undefined; + let selfUserIdLookup: Promise | undefined; + + return async (): Promise => { + if (selfUserId) { + return selfUserId; + } + if (!selfUserIdLookup) { + selfUserIdLookup = client + .getUserId() + .then((userId) => { + selfUserId = userId; + return userId; + }) + .catch(() => undefined) + .finally(() => { + if (!selfUserId) { + selfUserIdLookup = undefined; + } + }); + } + return await selfUserIdLookup; + }; +} + export function registerMatrixMonitorEvents(params: { client: MatrixClient; auth: MatrixAuth; @@ -26,28 +52,7 @@ export function registerMatrixMonitorEvents(params: { onRoomMessage, } = params; - let selfUserId: string | undefined; - let selfUserIdLookup: Promise | undefined; - const resolveSelfUserId = async (): Promise => { - if (selfUserId) { - return selfUserId; - } - if (!selfUserIdLookup) { - selfUserIdLookup = client - .getUserId() - .then((userId) => { - selfUserId = userId; - return userId; - }) - .catch(() => undefined) - .finally(() => { - if (!selfUserId) { - selfUserIdLookup = undefined; - } - }); - } - return await selfUserIdLookup; - }; + const resolveSelfUserId = createSelfUserIdResolver(client); client.on("room.message", (roomId: string, event: MatrixRawEvent) => { const eventId = event?.event_id; const senderId = event?.sender; diff --git a/extensions/matrix/src/matrix/send-queue.test.ts b/extensions/matrix/src/matrix/send-queue.test.ts index 508a01d30..bc90c5f50 100644 --- a/extensions/matrix/src/matrix/send-queue.test.ts +++ b/extensions/matrix/src/matrix/send-queue.test.ts @@ -1,5 +1,5 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { enqueueSend } from "./send-queue.js"; +import { DEFAULT_SEND_GAP_MS, enqueueSend } from "./send-queue.js"; function deferred() { let resolve!: (value: T | PromiseLike) => void; @@ -36,15 +36,15 @@ describe("enqueueSend", () => { return "two"; }); - await vi.advanceTimersByTimeAsync(150); + await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS); expect(events).toEqual(["start1"]); - await vi.advanceTimersByTimeAsync(300); + await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS * 2); expect(events).toEqual(["start1"]); gate.resolve(); await first; - await vi.advanceTimersByTimeAsync(149); + await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS - 1); expect(events).toEqual(["start1", "end1"]); await vi.advanceTimersByTimeAsync(1); await second; @@ -63,7 +63,7 @@ describe("enqueueSend", () => { return "b"; }); - await vi.advanceTimersByTimeAsync(150); + await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS); await Promise.all([a, b]); expect(events.sort()).toEqual(["a", "b"]); }); @@ -76,14 +76,14 @@ describe("enqueueSend", () => { (error) => ({ ok: false as const, error }), ); - await vi.advanceTimersByTimeAsync(150); + await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS); const firstResult = await first; expect(firstResult.ok).toBe(false); expect(firstResult.error).toBeInstanceOf(Error); expect((firstResult.error as Error).message).toBe("boom"); const second = enqueueSend("!room:example.org", async () => "ok"); - await vi.advanceTimersByTimeAsync(150); + await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS); await expect(second).resolves.toBe("ok"); }); @@ -104,7 +104,7 @@ describe("enqueueSend", () => { return "two"; }); - await vi.advanceTimersByTimeAsync(150); + await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS); expect(events).toEqual(["start1"]); gate.resolve(); @@ -112,8 +112,37 @@ describe("enqueueSend", () => { expect(firstResult.ok).toBe(false); expect(firstResult.error).toBeInstanceOf(Error); - await vi.advanceTimersByTimeAsync(150); + await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS); await expect(second).resolves.toBe("two"); expect(events).toEqual(["start1", "start2"]); }); + + it("supports custom gap and delay injection", async () => { + const events: string[] = []; + const delayFn = vi.fn(async (_ms: number) => {}); + + const first = enqueueSend( + "!room:example.org", + async () => { + events.push("first"); + return "one"; + }, + { gapMs: 7, delayFn }, + ); + const second = enqueueSend( + "!room:example.org", + async () => { + events.push("second"); + return "two"; + }, + { gapMs: 7, delayFn }, + ); + + await expect(first).resolves.toBe("one"); + await expect(second).resolves.toBe("two"); + expect(events).toEqual(["first", "second"]); + expect(delayFn).toHaveBeenCalledTimes(2); + expect(delayFn).toHaveBeenNthCalledWith(1, 7); + expect(delayFn).toHaveBeenNthCalledWith(2, 7); + }); }); diff --git a/extensions/matrix/src/matrix/send-queue.ts b/extensions/matrix/src/matrix/send-queue.ts index 0d5e43b40..daf5e4093 100644 --- a/extensions/matrix/src/matrix/send-queue.ts +++ b/extensions/matrix/src/matrix/send-queue.ts @@ -1,15 +1,26 @@ -const SEND_GAP_MS = 150; +export const DEFAULT_SEND_GAP_MS = 150; + +type MatrixSendQueueOptions = { + gapMs?: number; + delayFn?: (ms: number) => Promise; +}; // Serialize sends per room to preserve Matrix delivery order. const roomQueues = new Map>(); -export async function enqueueSend(roomId: string, fn: () => Promise): Promise { +export async function enqueueSend( + roomId: string, + fn: () => Promise, + options?: MatrixSendQueueOptions, +): Promise { + const gapMs = options?.gapMs ?? DEFAULT_SEND_GAP_MS; + const delayFn = options?.delayFn ?? delay; const previous = roomQueues.get(roomId) ?? Promise.resolve(); const next = previous .catch(() => {}) .then(async () => { - await delay(SEND_GAP_MS); + await delayFn(gapMs); return await fn(); }); diff --git a/src/agents/sandbox/fs-bridge.test.ts b/src/agents/sandbox/fs-bridge.test.ts index a4ae727b3..98744f356 100644 --- a/src/agents/sandbox/fs-bridge.test.ts +++ b/src/agents/sandbox/fs-bridge.test.ts @@ -14,6 +14,22 @@ import type { SandboxContext } from "./types.js"; const mockedExecDockerRaw = vi.mocked(execDockerRaw); +function getDockerScript(args: string[]): string { + return String(args[5] ?? ""); +} + +function getDockerPathArg(args: string[]): string { + return String(args.at(-1) ?? ""); +} + +function getScriptsFromCalls(): string[] { + return mockedExecDockerRaw.mock.calls.map(([args]) => getDockerScript(args)); +} + +function findCallByScriptFragment(fragment: string) { + return mockedExecDockerRaw.mock.calls.find(([args]) => getDockerScript(args).includes(fragment)); +} + function createSandbox(overrides?: Partial): SandboxContext { return createSandboxTestContext({ overrides: { @@ -31,7 +47,7 @@ describe("sandbox fs bridge shell compatibility", () => { beforeEach(() => { mockedExecDockerRaw.mockClear(); mockedExecDockerRaw.mockImplementation(async (args) => { - const script = args[5] ?? ""; + const script = getDockerScript(args); if (script.includes('readlink -f -- "$cursor"')) { return { stdout: Buffer.from(`${String(args.at(-2) ?? "")}\n`), @@ -73,7 +89,7 @@ describe("sandbox fs bridge shell compatibility", () => { expect(mockedExecDockerRaw).toHaveBeenCalled(); - const scripts = mockedExecDockerRaw.mock.calls.map(([args]) => args[5] ?? ""); + const scripts = getScriptsFromCalls(); const executables = mockedExecDockerRaw.mock.calls.map(([args]) => args[3] ?? ""); expect(executables.every((shell) => shell === "sh")).toBe(true); @@ -86,7 +102,7 @@ describe("sandbox fs bridge shell compatibility", () => { await bridge.readFile({ filePath: "a.txt" }); - const scripts = mockedExecDockerRaw.mock.calls.map(([args]) => args[5] ?? ""); + const scripts = getScriptsFromCalls(); const canonicalScript = scripts.find((script) => script.includes("allow_final")); expect(canonicalScript).toBeDefined(); // "; " joining can create "do; cmd", which is invalid in POSIX sh. @@ -101,11 +117,9 @@ describe("sandbox fs bridge shell compatibility", () => { await bridge.readFile({ filePath: inboundPath }); - const readCall = mockedExecDockerRaw.mock.calls.find(([args]) => - String(args[5] ?? "").includes('cat -- "$1"'), - ); + const readCall = findCallByScriptFragment('cat -- "$1"'); expect(readCall).toBeDefined(); - const readPath = String(readCall?.[0].at(-1) ?? ""); + const readPath = readCall ? getDockerPathArg(readCall[0]) : ""; expect(readPath).toContain("file_1095---"); }); @@ -124,7 +138,7 @@ describe("sandbox fs bridge shell compatibility", () => { expect(args).toEqual( expect.arrayContaining(["moltbot-sbx-test", "sh", "-c", 'set -eu; cat -- "$1"']), ); - expect(args.at(-1)).toBe("/workspace-two/README.md"); + expect(getDockerPathArg(args)).toBe("/workspace-two/README.md"); }); it("blocks writes into read-only bind mounts", async () => { @@ -166,7 +180,7 @@ describe("sandbox fs bridge shell compatibility", () => { it("rejects container-canonicalized paths outside allowed mounts", async () => { mockedExecDockerRaw.mockImplementation(async (args) => { - const script = args[5] ?? ""; + const script = getDockerScript(args); if (script.includes('readlink -f -- "$cursor"')) { return { stdout: Buffer.from("/etc/passwd\n"), @@ -190,7 +204,7 @@ describe("sandbox fs bridge shell compatibility", () => { const bridge = createSandboxFsBridge({ sandbox: createSandbox() }); await expect(bridge.readFile({ filePath: "a.txt" })).rejects.toThrow(/escapes allowed mounts/i); - const scripts = mockedExecDockerRaw.mock.calls.map(([args]) => args[5] ?? ""); + const scripts = getScriptsFromCalls(); expect(scripts.some((script) => script.includes('cat -- "$1"'))).toBe(false); }); }); diff --git a/src/discord/monitor/message-handler.process.test.ts b/src/discord/monitor/message-handler.process.test.ts index 60eade41f..750eab43b 100644 --- a/src/discord/monitor/message-handler.process.test.ts +++ b/src/discord/monitor/message-handler.process.test.ts @@ -35,7 +35,10 @@ type DispatchInboundParams = { text?: string; isReasoning?: boolean; }) => boolean | Promise; - sendFinalReply: (payload: { text?: string }) => boolean | Promise; + sendFinalReply: (payload: { + text?: string; + isReasoning?: boolean; + }) => boolean | Promise; }; replyOptions?: { onReasoningStream?: () => Promise | void; @@ -449,7 +452,7 @@ describe("processDiscordMessage draft streaming", () => { await params?.dispatcher.sendFinalReply({ text: "Reasoning:\nthis should stay internal", isReasoning: true, - } as never); + }); return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } }; });