refactor(matrix,tests): extract helpers and inject send-queue timing
This commit is contained in:
@@ -5,6 +5,32 @@ import { sendReadReceiptMatrix } from "../send.js";
|
||||
import type { MatrixRawEvent } from "./types.js";
|
||||
import { EventType } from "./types.js";
|
||||
|
||||
function createSelfUserIdResolver(client: Pick<MatrixClient, "getUserId">) {
|
||||
let selfUserId: string | undefined;
|
||||
let selfUserIdLookup: Promise<string | undefined> | undefined;
|
||||
|
||||
return async (): Promise<string | undefined> => {
|
||||
if (selfUserId) {
|
||||
return selfUserId;
|
||||
}
|
||||
if (!selfUserIdLookup) {
|
||||
selfUserIdLookup = client
|
||||
.getUserId()
|
||||
.then((userId) => {
|
||||
selfUserId = userId;
|
||||
return userId;
|
||||
})
|
||||
.catch(() => undefined)
|
||||
.finally(() => {
|
||||
if (!selfUserId) {
|
||||
selfUserIdLookup = undefined;
|
||||
}
|
||||
});
|
||||
}
|
||||
return await selfUserIdLookup;
|
||||
};
|
||||
}
|
||||
|
||||
export function registerMatrixMonitorEvents(params: {
|
||||
client: MatrixClient;
|
||||
auth: MatrixAuth;
|
||||
@@ -26,28 +52,7 @@ export function registerMatrixMonitorEvents(params: {
|
||||
onRoomMessage,
|
||||
} = params;
|
||||
|
||||
let selfUserId: string | undefined;
|
||||
let selfUserIdLookup: Promise<string | undefined> | undefined;
|
||||
const resolveSelfUserId = async (): Promise<string | undefined> => {
|
||||
if (selfUserId) {
|
||||
return selfUserId;
|
||||
}
|
||||
if (!selfUserIdLookup) {
|
||||
selfUserIdLookup = client
|
||||
.getUserId()
|
||||
.then((userId) => {
|
||||
selfUserId = userId;
|
||||
return userId;
|
||||
})
|
||||
.catch(() => undefined)
|
||||
.finally(() => {
|
||||
if (!selfUserId) {
|
||||
selfUserIdLookup = undefined;
|
||||
}
|
||||
});
|
||||
}
|
||||
return await selfUserIdLookup;
|
||||
};
|
||||
const resolveSelfUserId = createSelfUserIdResolver(client);
|
||||
client.on("room.message", (roomId: string, event: MatrixRawEvent) => {
|
||||
const eventId = event?.event_id;
|
||||
const senderId = event?.sender;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { enqueueSend } from "./send-queue.js";
|
||||
import { DEFAULT_SEND_GAP_MS, enqueueSend } from "./send-queue.js";
|
||||
|
||||
function deferred<T>() {
|
||||
let resolve!: (value: T | PromiseLike<T>) => void;
|
||||
@@ -36,15 +36,15 @@ describe("enqueueSend", () => {
|
||||
return "two";
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(150);
|
||||
await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS);
|
||||
expect(events).toEqual(["start1"]);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(300);
|
||||
await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS * 2);
|
||||
expect(events).toEqual(["start1"]);
|
||||
|
||||
gate.resolve();
|
||||
await first;
|
||||
await vi.advanceTimersByTimeAsync(149);
|
||||
await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS - 1);
|
||||
expect(events).toEqual(["start1", "end1"]);
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
await second;
|
||||
@@ -63,7 +63,7 @@ describe("enqueueSend", () => {
|
||||
return "b";
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(150);
|
||||
await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS);
|
||||
await Promise.all([a, b]);
|
||||
expect(events.sort()).toEqual(["a", "b"]);
|
||||
});
|
||||
@@ -76,14 +76,14 @@ describe("enqueueSend", () => {
|
||||
(error) => ({ ok: false as const, error }),
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(150);
|
||||
await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS);
|
||||
const firstResult = await first;
|
||||
expect(firstResult.ok).toBe(false);
|
||||
expect(firstResult.error).toBeInstanceOf(Error);
|
||||
expect((firstResult.error as Error).message).toBe("boom");
|
||||
|
||||
const second = enqueueSend("!room:example.org", async () => "ok");
|
||||
await vi.advanceTimersByTimeAsync(150);
|
||||
await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS);
|
||||
await expect(second).resolves.toBe("ok");
|
||||
});
|
||||
|
||||
@@ -104,7 +104,7 @@ describe("enqueueSend", () => {
|
||||
return "two";
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(150);
|
||||
await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS);
|
||||
expect(events).toEqual(["start1"]);
|
||||
|
||||
gate.resolve();
|
||||
@@ -112,8 +112,37 @@ describe("enqueueSend", () => {
|
||||
expect(firstResult.ok).toBe(false);
|
||||
expect(firstResult.error).toBeInstanceOf(Error);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(150);
|
||||
await vi.advanceTimersByTimeAsync(DEFAULT_SEND_GAP_MS);
|
||||
await expect(second).resolves.toBe("two");
|
||||
expect(events).toEqual(["start1", "start2"]);
|
||||
});
|
||||
|
||||
it("supports custom gap and delay injection", async () => {
|
||||
const events: string[] = [];
|
||||
const delayFn = vi.fn(async (_ms: number) => {});
|
||||
|
||||
const first = enqueueSend(
|
||||
"!room:example.org",
|
||||
async () => {
|
||||
events.push("first");
|
||||
return "one";
|
||||
},
|
||||
{ gapMs: 7, delayFn },
|
||||
);
|
||||
const second = enqueueSend(
|
||||
"!room:example.org",
|
||||
async () => {
|
||||
events.push("second");
|
||||
return "two";
|
||||
},
|
||||
{ gapMs: 7, delayFn },
|
||||
);
|
||||
|
||||
await expect(first).resolves.toBe("one");
|
||||
await expect(second).resolves.toBe("two");
|
||||
expect(events).toEqual(["first", "second"]);
|
||||
expect(delayFn).toHaveBeenCalledTimes(2);
|
||||
expect(delayFn).toHaveBeenNthCalledWith(1, 7);
|
||||
expect(delayFn).toHaveBeenNthCalledWith(2, 7);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,15 +1,26 @@
|
||||
const SEND_GAP_MS = 150;
|
||||
export const DEFAULT_SEND_GAP_MS = 150;
|
||||
|
||||
type MatrixSendQueueOptions = {
|
||||
gapMs?: number;
|
||||
delayFn?: (ms: number) => Promise<void>;
|
||||
};
|
||||
|
||||
// Serialize sends per room to preserve Matrix delivery order.
|
||||
const roomQueues = new Map<string, Promise<void>>();
|
||||
|
||||
export async function enqueueSend<T>(roomId: string, fn: () => Promise<T>): Promise<T> {
|
||||
export async function enqueueSend<T>(
|
||||
roomId: string,
|
||||
fn: () => Promise<T>,
|
||||
options?: MatrixSendQueueOptions,
|
||||
): Promise<T> {
|
||||
const gapMs = options?.gapMs ?? DEFAULT_SEND_GAP_MS;
|
||||
const delayFn = options?.delayFn ?? delay;
|
||||
const previous = roomQueues.get(roomId) ?? Promise.resolve();
|
||||
|
||||
const next = previous
|
||||
.catch(() => {})
|
||||
.then(async () => {
|
||||
await delay(SEND_GAP_MS);
|
||||
await delayFn(gapMs);
|
||||
return await fn();
|
||||
});
|
||||
|
||||
|
||||
@@ -14,6 +14,22 @@ import type { SandboxContext } from "./types.js";
|
||||
|
||||
const mockedExecDockerRaw = vi.mocked(execDockerRaw);
|
||||
|
||||
function getDockerScript(args: string[]): string {
|
||||
return String(args[5] ?? "");
|
||||
}
|
||||
|
||||
function getDockerPathArg(args: string[]): string {
|
||||
return String(args.at(-1) ?? "");
|
||||
}
|
||||
|
||||
function getScriptsFromCalls(): string[] {
|
||||
return mockedExecDockerRaw.mock.calls.map(([args]) => getDockerScript(args));
|
||||
}
|
||||
|
||||
function findCallByScriptFragment(fragment: string) {
|
||||
return mockedExecDockerRaw.mock.calls.find(([args]) => getDockerScript(args).includes(fragment));
|
||||
}
|
||||
|
||||
function createSandbox(overrides?: Partial<SandboxContext>): SandboxContext {
|
||||
return createSandboxTestContext({
|
||||
overrides: {
|
||||
@@ -31,7 +47,7 @@ describe("sandbox fs bridge shell compatibility", () => {
|
||||
beforeEach(() => {
|
||||
mockedExecDockerRaw.mockClear();
|
||||
mockedExecDockerRaw.mockImplementation(async (args) => {
|
||||
const script = args[5] ?? "";
|
||||
const script = getDockerScript(args);
|
||||
if (script.includes('readlink -f -- "$cursor"')) {
|
||||
return {
|
||||
stdout: Buffer.from(`${String(args.at(-2) ?? "")}\n`),
|
||||
@@ -73,7 +89,7 @@ describe("sandbox fs bridge shell compatibility", () => {
|
||||
|
||||
expect(mockedExecDockerRaw).toHaveBeenCalled();
|
||||
|
||||
const scripts = mockedExecDockerRaw.mock.calls.map(([args]) => args[5] ?? "");
|
||||
const scripts = getScriptsFromCalls();
|
||||
const executables = mockedExecDockerRaw.mock.calls.map(([args]) => args[3] ?? "");
|
||||
|
||||
expect(executables.every((shell) => shell === "sh")).toBe(true);
|
||||
@@ -86,7 +102,7 @@ describe("sandbox fs bridge shell compatibility", () => {
|
||||
|
||||
await bridge.readFile({ filePath: "a.txt" });
|
||||
|
||||
const scripts = mockedExecDockerRaw.mock.calls.map(([args]) => args[5] ?? "");
|
||||
const scripts = getScriptsFromCalls();
|
||||
const canonicalScript = scripts.find((script) => script.includes("allow_final"));
|
||||
expect(canonicalScript).toBeDefined();
|
||||
// "; " joining can create "do; cmd", which is invalid in POSIX sh.
|
||||
@@ -101,11 +117,9 @@ describe("sandbox fs bridge shell compatibility", () => {
|
||||
|
||||
await bridge.readFile({ filePath: inboundPath });
|
||||
|
||||
const readCall = mockedExecDockerRaw.mock.calls.find(([args]) =>
|
||||
String(args[5] ?? "").includes('cat -- "$1"'),
|
||||
);
|
||||
const readCall = findCallByScriptFragment('cat -- "$1"');
|
||||
expect(readCall).toBeDefined();
|
||||
const readPath = String(readCall?.[0].at(-1) ?? "");
|
||||
const readPath = readCall ? getDockerPathArg(readCall[0]) : "";
|
||||
expect(readPath).toContain("file_1095---");
|
||||
});
|
||||
|
||||
@@ -124,7 +138,7 @@ describe("sandbox fs bridge shell compatibility", () => {
|
||||
expect(args).toEqual(
|
||||
expect.arrayContaining(["moltbot-sbx-test", "sh", "-c", 'set -eu; cat -- "$1"']),
|
||||
);
|
||||
expect(args.at(-1)).toBe("/workspace-two/README.md");
|
||||
expect(getDockerPathArg(args)).toBe("/workspace-two/README.md");
|
||||
});
|
||||
|
||||
it("blocks writes into read-only bind mounts", async () => {
|
||||
@@ -166,7 +180,7 @@ describe("sandbox fs bridge shell compatibility", () => {
|
||||
|
||||
it("rejects container-canonicalized paths outside allowed mounts", async () => {
|
||||
mockedExecDockerRaw.mockImplementation(async (args) => {
|
||||
const script = args[5] ?? "";
|
||||
const script = getDockerScript(args);
|
||||
if (script.includes('readlink -f -- "$cursor"')) {
|
||||
return {
|
||||
stdout: Buffer.from("/etc/passwd\n"),
|
||||
@@ -190,7 +204,7 @@ describe("sandbox fs bridge shell compatibility", () => {
|
||||
|
||||
const bridge = createSandboxFsBridge({ sandbox: createSandbox() });
|
||||
await expect(bridge.readFile({ filePath: "a.txt" })).rejects.toThrow(/escapes allowed mounts/i);
|
||||
const scripts = mockedExecDockerRaw.mock.calls.map(([args]) => args[5] ?? "");
|
||||
const scripts = getScriptsFromCalls();
|
||||
expect(scripts.some((script) => script.includes('cat -- "$1"'))).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -35,7 +35,10 @@ type DispatchInboundParams = {
|
||||
text?: string;
|
||||
isReasoning?: boolean;
|
||||
}) => boolean | Promise<boolean>;
|
||||
sendFinalReply: (payload: { text?: string }) => boolean | Promise<boolean>;
|
||||
sendFinalReply: (payload: {
|
||||
text?: string;
|
||||
isReasoning?: boolean;
|
||||
}) => boolean | Promise<boolean>;
|
||||
};
|
||||
replyOptions?: {
|
||||
onReasoningStream?: () => Promise<void> | void;
|
||||
@@ -449,7 +452,7 @@ describe("processDiscordMessage draft streaming", () => {
|
||||
await params?.dispatcher.sendFinalReply({
|
||||
text: "Reasoning:\nthis should stay internal",
|
||||
isReasoning: true,
|
||||
} as never);
|
||||
});
|
||||
return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } };
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user