perf(test): cut gateway unit suite overhead

This commit is contained in:
Peter Steinberger
2026-02-15 23:28:07 +00:00
parent be4a490c23
commit 67bfe8fb80
12 changed files with 349 additions and 551 deletions

View File

@@ -165,7 +165,7 @@ const defaultWorkerBudget =
unit: Math.max(2, Math.min(8, Math.floor(localWorkers / 2))),
unitIsolated: 1,
extensions: Math.max(1, Math.min(4, Math.floor(localWorkers / 4))),
gateway: 1,
gateway: 2,
};
// Keep worker counts predictable for local runs; trim macOS CI workers to avoid worker crashes/OOM.

View File

@@ -1,31 +0,0 @@
import { describe, expect, test, vi } from "vitest";
import { GatewayClient } from "./client.js";
const wsMockState = vi.hoisted(() => ({
last: null as { url: unknown; opts: unknown } | null,
}));
vi.mock("ws", () => ({
WebSocket: class MockWebSocket {
on = vi.fn();
close = vi.fn();
send = vi.fn();
constructor(url: unknown, opts: unknown) {
wsMockState.last = { url, opts };
}
},
}));
describe("GatewayClient", () => {
test("uses a large maxPayload for node snapshots", () => {
wsMockState.last = null;
const client = new GatewayClient({ url: "ws://127.0.0.1:1" });
client.start();
expect(wsMockState.last?.url).toBe("ws://127.0.0.1:1");
expect(wsMockState.last?.opts).toEqual(
expect.objectContaining({ maxPayload: 25 * 1024 * 1024 }),
);
});
});

View File

@@ -1,44 +0,0 @@
import type { IncomingMessage, ServerResponse } from "node:http";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import { handleControlUiHttpRequest } from "./control-ui.js";
const makeResponse = (): {
res: ServerResponse;
setHeader: ReturnType<typeof vi.fn>;
end: ReturnType<typeof vi.fn>;
} => {
const setHeader = vi.fn();
const end = vi.fn();
const res = {
headersSent: false,
statusCode: 200,
setHeader,
end,
} as unknown as ServerResponse;
return { res, setHeader, end };
};
describe("handleControlUiHttpRequest", () => {
it("sets anti-clickjacking headers for Control UI responses", async () => {
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-ui-"));
try {
await fs.writeFile(path.join(tmp, "index.html"), "<html></html>\n");
const { res, setHeader } = makeResponse();
const handled = handleControlUiHttpRequest(
{ url: "/", method: "GET" } as IncomingMessage,
res,
{
root: { kind: "resolved", path: tmp },
},
);
expect(handled).toBe(true);
expect(setHeader).toHaveBeenCalledWith("X-Frame-Options", "DENY");
expect(setHeader).toHaveBeenCalledWith("Content-Security-Policy", "frame-ancestors 'none'");
} finally {
await fs.rm(tmp, { recursive: true, force: true });
}
});
});

View File

@@ -0,0 +1,283 @@
import type { IncomingMessage, ServerResponse } from "node:http";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it, test, vi } from "vitest";
import type { RequestFrame } from "./protocol/index.js";
import type { GatewayClient as GatewayMethodClient } from "./server-methods/types.js";
import type { GatewayRequestContext, RespondFn } from "./server-methods/types.js";
import type { GatewayWsClient } from "./server/ws-types.js";
import { GatewayClient } from "./client.js";
import { handleControlUiHttpRequest } from "./control-ui.js";
import {
DEFAULT_DANGEROUS_NODE_COMMANDS,
resolveNodeCommandAllowlist,
} from "./node-command-policy.js";
import { createGatewayBroadcaster } from "./server-broadcast.js";
import { createChatRunRegistry } from "./server-chat.js";
import { handleNodeInvokeResult } from "./server-methods/nodes.handlers.invoke-result.js";
import { createNodeSubscriptionManager } from "./server-node-subscriptions.js";
const wsMockState = vi.hoisted(() => ({
last: null as { url: unknown; opts: unknown } | null,
}));
vi.mock("ws", () => ({
WebSocket: class MockWebSocket {
on = vi.fn();
close = vi.fn();
send = vi.fn();
constructor(url: unknown, opts: unknown) {
wsMockState.last = { url, opts };
}
},
}));
describe("GatewayClient", () => {
test("uses a large maxPayload for node snapshots", () => {
wsMockState.last = null;
const client = new GatewayClient({ url: "ws://127.0.0.1:1" });
client.start();
expect(wsMockState.last?.url).toBe("ws://127.0.0.1:1");
expect(wsMockState.last?.opts).toEqual(
expect.objectContaining({ maxPayload: 25 * 1024 * 1024 }),
);
});
});
const makeControlUiResponse = (): {
res: ServerResponse;
setHeader: ReturnType<typeof vi.fn>;
end: ReturnType<typeof vi.fn>;
} => {
const setHeader = vi.fn();
const end = vi.fn();
const res = {
headersSent: false,
statusCode: 200,
setHeader,
end,
} as unknown as ServerResponse;
return { res, setHeader, end };
};
describe("handleControlUiHttpRequest", () => {
it("sets anti-clickjacking headers for Control UI responses", async () => {
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-ui-"));
try {
await fs.writeFile(path.join(tmp, "index.html"), "<html></html>\n");
const { res, setHeader } = makeControlUiResponse();
const handled = handleControlUiHttpRequest(
{ url: "/", method: "GET" } as IncomingMessage,
res,
{
root: { kind: "resolved", path: tmp },
},
);
expect(handled).toBe(true);
expect(setHeader).toHaveBeenCalledWith("X-Frame-Options", "DENY");
expect(setHeader).toHaveBeenCalledWith("Content-Security-Policy", "frame-ancestors 'none'");
} finally {
await fs.rm(tmp, { recursive: true, force: true });
}
});
});
type TestSocket = {
bufferedAmount: number;
send: (payload: string) => void;
close: (code: number, reason: string) => void;
};
describe("gateway broadcaster", () => {
it("filters approval and pairing events by scope", () => {
const approvalsSocket: TestSocket = {
bufferedAmount: 0,
send: vi.fn(),
close: vi.fn(),
};
const pairingSocket: TestSocket = {
bufferedAmount: 0,
send: vi.fn(),
close: vi.fn(),
};
const readSocket: TestSocket = {
bufferedAmount: 0,
send: vi.fn(),
close: vi.fn(),
};
const clients = new Set<GatewayWsClient>([
{
socket: approvalsSocket as unknown as GatewayWsClient["socket"],
connect: { role: "operator", scopes: ["operator.approvals"] } as GatewayWsClient["connect"],
connId: "c-approvals",
},
{
socket: pairingSocket as unknown as GatewayWsClient["socket"],
connect: { role: "operator", scopes: ["operator.pairing"] } as GatewayWsClient["connect"],
connId: "c-pairing",
},
{
socket: readSocket as unknown as GatewayWsClient["socket"],
connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"],
connId: "c-read",
},
]);
const { broadcast, broadcastToConnIds } = createGatewayBroadcaster({ clients });
broadcast("exec.approval.requested", { id: "1" });
broadcast("device.pair.requested", { requestId: "r1" });
expect(approvalsSocket.send).toHaveBeenCalledTimes(1);
expect(pairingSocket.send).toHaveBeenCalledTimes(1);
expect(readSocket.send).toHaveBeenCalledTimes(0);
broadcastToConnIds("tick", { ts: 1 }, new Set(["c-read"]));
expect(readSocket.send).toHaveBeenCalledTimes(1);
expect(approvalsSocket.send).toHaveBeenCalledTimes(1);
expect(pairingSocket.send).toHaveBeenCalledTimes(1);
});
});
describe("chat run registry", () => {
test("queues and removes runs per session", () => {
const registry = createChatRunRegistry();
registry.add("s1", { sessionKey: "main", clientRunId: "c1" });
registry.add("s1", { sessionKey: "main", clientRunId: "c2" });
expect(registry.peek("s1")?.clientRunId).toBe("c1");
expect(registry.shift("s1")?.clientRunId).toBe("c1");
expect(registry.peek("s1")?.clientRunId).toBe("c2");
expect(registry.remove("s1", "c2")?.clientRunId).toBe("c2");
expect(registry.peek("s1")).toBeUndefined();
});
});
describe("late-arriving invoke results", () => {
test("returns success for unknown invoke ids for both success and error payloads", async () => {
const nodeId = "node-123";
const cases = [
{
id: "unknown-invoke-id-12345",
ok: true,
payloadJSON: JSON.stringify({ result: "late" }),
},
{
id: "another-unknown-invoke-id",
ok: false,
error: { code: "FAILED", message: "test error" },
},
] as const;
for (const params of cases) {
const respond = vi.fn<RespondFn>();
const context = {
nodeRegistry: { handleInvokeResult: () => false },
logGateway: { debug: vi.fn() },
} as unknown as GatewayRequestContext;
const client = {
connect: { device: { id: nodeId } },
} as unknown as GatewayMethodClient;
await handleNodeInvokeResult({
req: { method: "node.invoke.result" } as unknown as RequestFrame,
params: { ...params, nodeId } as unknown as Record<string, unknown>,
client,
isWebchatConnect: () => false,
respond,
context,
});
const [ok, payload, error] = respond.mock.lastCall ?? [];
// Late-arriving results return success instead of error to reduce log noise.
expect(ok).toBe(true);
expect(error).toBeUndefined();
expect(payload?.ok).toBe(true);
expect(payload?.ignored).toBe(true);
}
});
});
describe("node subscription manager", () => {
test("routes events to subscribed nodes", () => {
const manager = createNodeSubscriptionManager();
const sent: Array<{
nodeId: string;
event: string;
payloadJSON?: string | null;
}> = [];
const sendEvent = (evt: { nodeId: string; event: string; payloadJSON?: string | null }) =>
sent.push(evt);
manager.subscribe("node-a", "main");
manager.subscribe("node-b", "main");
manager.sendToSession("main", "chat", { ok: true }, sendEvent);
expect(sent).toHaveLength(2);
expect(sent.map((s) => s.nodeId).toSorted()).toEqual(["node-a", "node-b"]);
expect(sent[0].event).toBe("chat");
});
test("unsubscribeAll clears session mappings", () => {
const manager = createNodeSubscriptionManager();
const sent: string[] = [];
const sendEvent = (evt: { nodeId: string; event: string }) =>
sent.push(`${evt.nodeId}:${evt.event}`);
manager.subscribe("node-a", "main");
manager.subscribe("node-a", "secondary");
manager.unsubscribeAll("node-a");
manager.sendToSession("main", "tick", {}, sendEvent);
manager.sendToSession("secondary", "tick", {}, sendEvent);
expect(sent).toEqual([]);
});
});
describe("resolveNodeCommandAllowlist", () => {
it("includes iOS service commands by default", () => {
const allow = resolveNodeCommandAllowlist(
{},
{
platform: "ios 26.0",
deviceFamily: "iPhone",
},
);
expect(allow.has("device.info")).toBe(true);
expect(allow.has("device.status")).toBe(true);
expect(allow.has("system.notify")).toBe(true);
expect(allow.has("contacts.search")).toBe(true);
expect(allow.has("calendar.events")).toBe(true);
expect(allow.has("reminders.list")).toBe(true);
expect(allow.has("photos.latest")).toBe(true);
expect(allow.has("motion.activity")).toBe(true);
for (const cmd of DEFAULT_DANGEROUS_NODE_COMMANDS) {
expect(allow.has(cmd)).toBe(false);
}
});
it("can explicitly allow dangerous commands via allowCommands", () => {
const allow = resolveNodeCommandAllowlist(
{
gateway: {
nodes: {
allowCommands: ["camera.snap", "screen.record"],
},
},
},
{ platform: "ios", deviceFamily: "iPhone" },
);
expect(allow.has("camera.snap")).toBe(true);
expect(allow.has("screen.record")).toBe(true);
expect(allow.has("camera.clip")).toBe(false);
});
});

View File

@@ -1,46 +0,0 @@
import { describe, expect, it } from "vitest";
import {
DEFAULT_DANGEROUS_NODE_COMMANDS,
resolveNodeCommandAllowlist,
} from "./node-command-policy.js";
describe("resolveNodeCommandAllowlist", () => {
it("includes iOS service commands by default", () => {
const allow = resolveNodeCommandAllowlist(
{},
{
platform: "ios 26.0",
deviceFamily: "iPhone",
},
);
expect(allow.has("device.info")).toBe(true);
expect(allow.has("device.status")).toBe(true);
expect(allow.has("system.notify")).toBe(true);
expect(allow.has("contacts.search")).toBe(true);
expect(allow.has("calendar.events")).toBe(true);
expect(allow.has("reminders.list")).toBe(true);
expect(allow.has("photos.latest")).toBe(true);
expect(allow.has("motion.activity")).toBe(true);
for (const cmd of DEFAULT_DANGEROUS_NODE_COMMANDS) {
expect(allow.has(cmd)).toBe(false);
}
});
it("can explicitly allow dangerous commands via allowCommands", () => {
const allow = resolveNodeCommandAllowlist(
{
gateway: {
nodes: {
allowCommands: ["camera.snap", "screen.record"],
},
},
},
{ platform: "ios", deviceFamily: "iPhone" },
);
expect(allow.has("camera.snap")).toBe(true);
expect(allow.has("screen.record")).toBe(true);
expect(allow.has("camera.clip")).toBe(false);
});
});

View File

@@ -1,61 +0,0 @@
import { describe, expect, it, vi } from "vitest";
import type { GatewayWsClient } from "./server/ws-types.js";
import { createGatewayBroadcaster } from "./server-broadcast.js";
type TestSocket = {
bufferedAmount: number;
send: (payload: string) => void;
close: (code: number, reason: string) => void;
};
describe("gateway broadcaster", () => {
it("filters approval and pairing events by scope", () => {
const approvalsSocket: TestSocket = {
bufferedAmount: 0,
send: vi.fn(),
close: vi.fn(),
};
const pairingSocket: TestSocket = {
bufferedAmount: 0,
send: vi.fn(),
close: vi.fn(),
};
const readSocket: TestSocket = {
bufferedAmount: 0,
send: vi.fn(),
close: vi.fn(),
};
const clients = new Set<GatewayWsClient>([
{
socket: approvalsSocket as unknown as GatewayWsClient["socket"],
connect: { role: "operator", scopes: ["operator.approvals"] } as GatewayWsClient["connect"],
connId: "c-approvals",
},
{
socket: pairingSocket as unknown as GatewayWsClient["socket"],
connect: { role: "operator", scopes: ["operator.pairing"] } as GatewayWsClient["connect"],
connId: "c-pairing",
},
{
socket: readSocket as unknown as GatewayWsClient["socket"],
connect: { role: "operator", scopes: ["operator.read"] } as GatewayWsClient["connect"],
connId: "c-read",
},
]);
const { broadcast, broadcastToConnIds } = createGatewayBroadcaster({ clients });
broadcast("exec.approval.requested", { id: "1" });
broadcast("device.pair.requested", { requestId: "r1" });
expect(approvalsSocket.send).toHaveBeenCalledTimes(1);
expect(pairingSocket.send).toHaveBeenCalledTimes(1);
expect(readSocket.send).toHaveBeenCalledTimes(0);
broadcastToConnIds("tick", { ts: 1 }, new Set(["c-read"]));
expect(readSocket.send).toHaveBeenCalledTimes(1);
expect(approvalsSocket.send).toHaveBeenCalledTimes(1);
expect(pairingSocket.send).toHaveBeenCalledTimes(1);
});
});

View File

@@ -1,18 +0,0 @@
import { describe, expect, test } from "vitest";
import { createChatRunRegistry } from "./server-chat.js";
describe("chat run registry", () => {
test("queues and removes runs per session", () => {
const registry = createChatRunRegistry();
registry.add("s1", { sessionKey: "main", clientRunId: "c1" });
registry.add("s1", { sessionKey: "main", clientRunId: "c2" });
expect(registry.peek("s1")?.clientRunId).toBe("c1");
expect(registry.shift("s1")?.clientRunId).toBe("c1");
expect(registry.peek("s1")?.clientRunId).toBe("c2");
expect(registry.remove("s1", "c2")?.clientRunId).toBe("c2");
expect(registry.peek("s1")).toBeUndefined();
});
});

View File

@@ -1,38 +0,0 @@
import { describe, expect, test } from "vitest";
import { createNodeSubscriptionManager } from "./server-node-subscriptions.js";
describe("node subscription manager", () => {
test("routes events to subscribed nodes", () => {
const manager = createNodeSubscriptionManager();
const sent: Array<{
nodeId: string;
event: string;
payloadJSON?: string | null;
}> = [];
const sendEvent = (evt: { nodeId: string; event: string; payloadJSON?: string | null }) =>
sent.push(evt);
manager.subscribe("node-a", "main");
manager.subscribe("node-b", "main");
manager.sendToSession("main", "chat", { ok: true }, sendEvent);
expect(sent).toHaveLength(2);
expect(sent.map((s) => s.nodeId).toSorted()).toEqual(["node-a", "node-b"]);
expect(sent[0].event).toBe("chat");
});
test("unsubscribeAll clears session mappings", () => {
const manager = createNodeSubscriptionManager();
const sent: string[] = [];
const sendEvent = (evt: { nodeId: string; event: string }) =>
sent.push(`${evt.nodeId}:${evt.event}`);
manager.subscribe("node-a", "main");
manager.subscribe("node-a", "secondary");
manager.unsubscribeAll("node-a");
manager.sendToSession("main", "tick", {}, sendEvent);
manager.sendToSession("secondary", "tick", {}, sendEvent);
expect(sent).toEqual([]);
});
});

View File

@@ -1,105 +0,0 @@
/**
* E2E test for config reload during active reply sending.
* Tests that gateway restart is properly deferred until replies are sent.
*/
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
clearAllDispatchers,
getTotalPendingReplies,
} from "../auto-reply/reply/dispatcher-registry.js";
import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js";
import { getTotalQueueSize } from "../process/command-queue.js";
// Helper to flush all pending microtasks
async function flushMicrotasks() {
for (let i = 0; i < 10; i++) {
await Promise.resolve();
}
}
describe("gateway config reload during reply", () => {
beforeEach(() => {
vi.clearAllMocks();
});
afterEach(async () => {
vi.restoreAllMocks();
// Wait for any pending microtasks (from markComplete()) to complete
await flushMicrotasks();
clearAllDispatchers();
});
it("should defer restart until reply dispatcher completes", async () => {
// Create a dispatcher (simulating message handling)
let deliveredReplies: string[] = [];
const dispatcher = createReplyDispatcher({
deliver: async (payload) => {
// Keep delivery asynchronous without real wall-clock delay.
await Promise.resolve();
deliveredReplies.push(payload.text ?? "");
},
onError: (err) => {
throw err;
},
});
// Initially: pending=1 (reservation)
expect(getTotalPendingReplies()).toBe(1);
// Simulate command finishing and enqueuing reply
dispatcher.sendFinalReply({ text: "Configuration updated successfully!" });
// Now: pending=2 (reservation + 1 enqueued reply)
expect(getTotalPendingReplies()).toBe(2);
// Mark dispatcher complete (flags reservation for cleanup on last delivery)
dispatcher.markComplete();
// Reservation is still counted until the delivery .finally() clears it,
// but the important invariant is pending > 0 while delivery is in flight.
expect(getTotalPendingReplies()).toBeGreaterThan(0);
// At this point, if gateway restart was requested, it should defer
// because getTotalPendingReplies() > 0
// Wait for reply to be delivered
await dispatcher.waitForIdle();
// Now: pending=0 (reply sent)
expect(getTotalPendingReplies()).toBe(0);
expect(deliveredReplies).toEqual(["Configuration updated successfully!"]);
// Now restart can proceed safely
expect(getTotalQueueSize()).toBe(0);
expect(getTotalPendingReplies()).toBe(0);
});
it("should handle dispatcher reservation correctly when no replies sent", async () => {
const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js");
let deliverCalled = false;
const dispatcher = createReplyDispatcher({
deliver: async () => {
deliverCalled = true;
},
});
// Initially: pending=1 (reservation)
expect(getTotalPendingReplies()).toBe(1);
// Mark complete without sending any replies
dispatcher.markComplete();
// Reservation is cleared via microtask — flush it
await flushMicrotasks();
// Now: pending=0 (reservation cleared, no replies were enqueued)
expect(getTotalPendingReplies()).toBe(0);
// Wait for idle (should resolve immediately since no replies)
await dispatcher.waitForIdle();
expect(deliverCalled).toBe(false);
expect(getTotalPendingReplies()).toBe(0);
});
});

View File

@@ -1,120 +0,0 @@
/**
* Integration test simulating full message handling + config change + reply flow.
* This tests the complete scenario where a user configures an adapter via chat
* and ensures they get a reply before the gateway restarts.
*/
import { describe, expect, it, vi, beforeEach, afterEach } from "vitest";
import {
clearAllDispatchers,
getTotalPendingReplies,
} from "../auto-reply/reply/dispatcher-registry.js";
import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js";
import { getTotalQueueSize } from "../process/command-queue.js";
describe("gateway restart deferral integration", () => {
beforeEach(() => {
vi.clearAllMocks();
});
afterEach(async () => {
vi.restoreAllMocks();
// Wait for any pending microtasks (from markComplete()) to complete
await Promise.resolve();
clearAllDispatchers();
});
it("should defer restart until dispatcher completes with reply", async () => {
const events: string[] = [];
// T=0: Message received — dispatcher created (pending=1 reservation)
events.push("message-received");
const deliveredReplies: Array<{ text: string; timestamp: number }> = [];
const dispatcher = createReplyDispatcher({
deliver: async (payload) => {
// Keep delivery asynchronous without real wall-clock delay.
await Promise.resolve();
deliveredReplies.push({
text: payload.text ?? "",
timestamp: Date.now(),
});
events.push(`reply-delivered: ${payload.text}`);
},
});
events.push("dispatcher-created");
// T=1: Config change detected
events.push("config-change-detected");
// Check if restart should be deferred
const queueSize = getTotalQueueSize();
const pendingReplies = getTotalPendingReplies();
const totalActive = queueSize + pendingReplies;
events.push(`defer-check: queue=${queueSize} pending=${pendingReplies} total=${totalActive}`);
// Should defer because dispatcher has reservation
expect(totalActive).toBeGreaterThan(0);
expect(pendingReplies).toBe(1); // reservation
if (totalActive > 0) {
events.push("restart-deferred");
}
// T=2: Command finishes, enqueue replies
dispatcher.sendFinalReply({ text: "Adapter configured successfully!" });
dispatcher.sendFinalReply({ text: "Gateway will restart to apply changes." });
events.push("replies-enqueued");
// Now pending should be 3 (reservation + 2 replies)
expect(getTotalPendingReplies()).toBe(3);
// Mark command complete (flags reservation for cleanup on last delivery)
dispatcher.markComplete();
events.push("command-complete");
// Reservation still counted until delivery .finally() clears it,
// but the important invariant is pending > 0 while deliveries are in flight.
expect(getTotalPendingReplies()).toBeGreaterThan(0);
// T=3: Wait for replies to be delivered
await dispatcher.waitForIdle();
events.push("dispatcher-idle");
// Replies should be delivered
expect(deliveredReplies).toHaveLength(2);
expect(deliveredReplies[0].text).toBe("Adapter configured successfully!");
expect(deliveredReplies[1].text).toBe("Gateway will restart to apply changes.");
// Pending should be 0
expect(getTotalPendingReplies()).toBe(0);
// T=4: Check if restart can proceed
const finalQueueSize = getTotalQueueSize();
const finalPendingReplies = getTotalPendingReplies();
const finalTotalActive = finalQueueSize + finalPendingReplies;
events.push(
`restart-check: queue=${finalQueueSize} pending=${finalPendingReplies} total=${finalTotalActive}`,
);
// Everything should be idle now
expect(finalTotalActive).toBe(0);
events.push("restart-can-proceed");
// Verify event sequence
expect(events).toEqual([
"message-received",
"dispatcher-created",
"config-change-detected",
"defer-check: queue=0 pending=1 total=1",
"restart-deferred",
"replies-enqueued",
"command-complete",
"reply-delivered: Adapter configured successfully!",
"reply-delivered: Gateway will restart to apply changes.",
"dispatcher-idle",
"restart-check: queue=0 pending=0 total=0",
"restart-can-proceed",
]);
});
});

View File

@@ -1,13 +1,17 @@
/**
* REAL scenario test - simulates actual message handling with config changes.
* This test MUST fail if "imsg rpc not running" would occur in production.
*/
import { describe, expect, it, vi, beforeEach, afterEach } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
clearAllDispatchers,
getTotalPendingReplies,
} from "../auto-reply/reply/dispatcher-registry.js";
import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js";
import { getTotalQueueSize } from "../process/command-queue.js";
async function flushMicrotasks(count = 10): Promise<void> {
for (let i = 0; i < count; i += 1) {
// eslint-disable-next-line no-await-in-loop
await Promise.resolve();
}
}
function createDeferred<T = void>() {
let resolve!: (value: T | PromiseLike<T>) => void;
@@ -19,7 +23,7 @@ function createDeferred<T = void>() {
return { promise, resolve, reject };
}
describe("real scenario: config change during message processing", () => {
describe("gateway restart deferral", () => {
let replyErrors: string[] = [];
beforeEach(() => {
@@ -29,12 +33,11 @@ describe("real scenario: config change during message processing", () => {
afterEach(async () => {
vi.restoreAllMocks();
// Wait for any pending microtasks (from markComplete()) to complete
await Promise.resolve();
await flushMicrotasks();
clearAllDispatchers();
});
it("should NOT restart gateway while reply delivery is in flight", async () => {
it("defers restart while reply delivery is in flight", async () => {
let rpcConnected = true;
const deliveredReplies: string[] = [];
const deliveryStarted = createDeferred();
@@ -53,7 +56,7 @@ describe("real scenario: config change during message processing", () => {
deliveredReplies.push(payload.text ?? "");
},
onError: () => {
// Swallow delivery errors so the test can assert on replyErrors
// Swallow delivery errors so the test can assert on replyErrors.
},
});
@@ -64,15 +67,11 @@ describe("real scenario: config change during message processing", () => {
dispatcher.markComplete();
await deliveryStarted.promise;
// At this point: markComplete flagged, delivery is in flight.
// pending > 0 because the in-flight delivery keeps it alive.
const pendingDuringDelivery = getTotalPendingReplies();
expect(pendingDuringDelivery).toBeGreaterThan(0);
// At this point: delivery is in flight; pending > 0 prevents restart.
expect(getTotalPendingReplies()).toBeGreaterThan(0);
// Simulate restart checks while delivery is in progress.
// If the tracking is broken, pending would be 0 and we'd restart.
let restartTriggered = false;
for (let i = 0; i < 3; i++) {
for (let i = 0; i < 3; i += 1) {
await Promise.resolve();
const pending = getTotalPendingReplies();
if (pending === 0) {
@@ -83,54 +82,83 @@ describe("real scenario: config change during message processing", () => {
}
allowDelivery.resolve();
// Wait for delivery to complete
await dispatcher.waitForIdle();
// Now pending should be 0 — restart can proceed
expect(getTotalPendingReplies()).toBe(0);
// CRITICAL: delivery must have succeeded without RPC being killed
expect(restartTriggered).toBe(false);
expect(replyErrors).toEqual([]);
expect(deliveredReplies).toEqual(["Configuration updated!"]);
});
it("should keep pending > 0 until reply is actually enqueued", async () => {
it("keeps pending > 0 until the reply is actually enqueued", async () => {
const allowDelivery = createDeferred();
const dispatcher = createReplyDispatcher({
deliver: async (_payload) => {
deliver: async () => {
await allowDelivery.promise;
},
});
// Initially: pending=1 (reservation)
expect(getTotalPendingReplies()).toBe(1);
// Simulate command processing delay BEFORE reply is enqueued
await Promise.resolve();
// During this delay, pending should STILL be 1 (reservation active)
expect(getTotalPendingReplies()).toBe(1);
// Now enqueue reply
dispatcher.sendFinalReply({ text: "Reply" });
// Now pending should be 2 (reservation + reply)
expect(getTotalPendingReplies()).toBe(2);
// Mark complete
dispatcher.markComplete();
// After markComplete, pending should still be > 0 if reply hasn't sent yet
const pendingAfterMarkComplete = getTotalPendingReplies();
expect(pendingAfterMarkComplete).toBeGreaterThan(0);
expect(getTotalPendingReplies()).toBeGreaterThan(0);
allowDelivery.resolve();
// Wait for reply to send
await dispatcher.waitForIdle();
expect(getTotalPendingReplies()).toBe(0);
});
it("defers restart until reply dispatcher completes", async () => {
const deliveredReplies: string[] = [];
const dispatcher = createReplyDispatcher({
deliver: async (payload) => {
await Promise.resolve();
deliveredReplies.push(payload.text ?? "");
},
onError: (err) => {
throw err;
},
});
expect(getTotalPendingReplies()).toBe(1);
dispatcher.sendFinalReply({ text: "Configuration updated successfully!" });
expect(getTotalPendingReplies()).toBe(2);
dispatcher.markComplete();
expect(getTotalPendingReplies()).toBeGreaterThan(0);
await dispatcher.waitForIdle();
// Now pending should be 0
expect(getTotalPendingReplies()).toBe(0);
expect(deliveredReplies).toEqual(["Configuration updated successfully!"]);
expect(getTotalQueueSize()).toBe(0);
});
it("clears dispatcher reservation when no replies were sent", async () => {
let deliverCalled = false;
const dispatcher = createReplyDispatcher({
deliver: async () => {
deliverCalled = true;
},
});
expect(getTotalPendingReplies()).toBe(1);
dispatcher.markComplete();
await flushMicrotasks();
expect(getTotalPendingReplies()).toBe(0);
await dispatcher.waitForIdle();
expect(deliverCalled).toBe(false);
expect(getTotalPendingReplies()).toBe(0);
});
});

View File

@@ -1,50 +0,0 @@
import { describe, expect, test, vi } from "vitest";
import type { RequestFrame } from "./protocol/index.js";
import type { GatewayClient, GatewayRequestContext, RespondFn } from "./server-methods/types.js";
import { handleNodeInvokeResult } from "./server-methods/nodes.handlers.invoke-result.js";
describe("late-arriving invoke results", () => {
test("returns success for unknown invoke ids for both success and error payloads", async () => {
const nodeId = "node-123";
const cases = [
{
id: "unknown-invoke-id-12345",
ok: true,
payloadJSON: JSON.stringify({ result: "late" }),
},
{
id: "another-unknown-invoke-id",
ok: false,
error: { code: "FAILED", message: "test error" },
},
] as const;
for (const params of cases) {
const respond = vi.fn<RespondFn>();
const context = {
nodeRegistry: { handleInvokeResult: () => false },
logGateway: { debug: vi.fn() },
} as unknown as GatewayRequestContext;
const client = {
connect: { device: { id: nodeId } },
} as unknown as GatewayClient;
await handleNodeInvokeResult({
req: { method: "node.invoke.result" } as unknown as RequestFrame,
params: { ...params, nodeId } as unknown as Record<string, unknown>,
client,
isWebchatConnect: () => false,
respond,
context,
});
const [ok, payload, error] = respond.mock.lastCall ?? [];
// Late-arriving results return success instead of error to reduce log noise.
expect(ok).toBe(true);
expect(error).toBeUndefined();
expect(payload?.ok).toBe(true);
expect(payload?.ignored).toBe(true);
}
});
});