fix(browser): land PR #27617 relay reconnect resilience
This commit is contained in:
@@ -49,6 +49,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Browser/Chrome extension handshake: bind relay WS message handling before `onopen` and add non-blocking `connect.challenge` response handling for gateway-style handshake frames, avoiding stuck `…` badge states when challenge frames arrive immediately on connect. Landed from contributor PR #22571 by @pandego. (#22553)
|
||||
- Browser/Extension relay init: dedupe concurrent same-port relay startup with shared in-flight initialization promises so callers await one startup lifecycle and receive consistent success/failure results. Landed from contributor PR #21277 by @HOYALIM. (Related #20688)
|
||||
- Browser/Extension relay shutdown: flush pending extension-request timers/rejections during relay `stop()` before socket/server teardown so in-flight extension waits do not survive shutdown windows. Landed from contributor PR #24142 by @kevinWangSheng.
|
||||
- Browser/Extension relay reconnect resilience: keep CDP clients alive across brief MV3 extension disconnect windows, wait briefly for extension reconnect before failing in-flight CDP commands, and only tear down relay target/client state after reconnect grace expires. Landed from contributor PR #27617 by @davidemanuelDEV.
|
||||
- Browser/Route decode hardening: guard malformed percent-encoding in relay target action routes and browser route-param decoding so crafted `%` paths return `400` instead of crashing/unhandled URI decode failures. Landed from contributor PR #11880 by @Yida-Dev.
|
||||
- Feishu/Permission error dispatch: merge sender-name permission notices into the main inbound dispatch so one user message produces one agent turn/reply (instead of a duplicate permission-notice turn), with regression coverage. (#27381) thanks @byungsker.
|
||||
- Feishu/Inbound message metadata: include inbound `message_id` in `BodyForAgent` on a dedicated metadata line so agents can reliably correlate and act on media/message operations that require message IDs, with regression coverage. (#27253) thanks @xss925175263.
|
||||
|
||||
@@ -27,6 +27,22 @@ function waitForError(ws: WebSocket) {
|
||||
});
|
||||
}
|
||||
|
||||
function waitForClose(ws: WebSocket, timeoutMs = RELAY_MESSAGE_TIMEOUT_MS) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const timer = setTimeout(() => {
|
||||
reject(new Error("timeout"));
|
||||
}, timeoutMs);
|
||||
ws.once("close", () => {
|
||||
clearTimeout(timer);
|
||||
resolve();
|
||||
});
|
||||
ws.once("error", (err) => {
|
||||
clearTimeout(timer);
|
||||
reject(err instanceof Error ? err : new Error(String(err)));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function relayAuthHeaders(url: string) {
|
||||
return getChromeExtensionRelayAuthHeaders(url);
|
||||
}
|
||||
@@ -132,8 +148,14 @@ describe("chrome extension relay server", () => {
|
||||
let envSnapshot: ReturnType<typeof captureEnv>;
|
||||
|
||||
beforeEach(() => {
|
||||
envSnapshot = captureEnv(["OPENCLAW_GATEWAY_TOKEN"]);
|
||||
envSnapshot = captureEnv([
|
||||
"OPENCLAW_GATEWAY_TOKEN",
|
||||
"OPENCLAW_EXTENSION_RELAY_RECONNECT_GRACE_MS",
|
||||
"OPENCLAW_EXTENSION_RELAY_COMMAND_RECONNECT_WAIT_MS",
|
||||
]);
|
||||
process.env.OPENCLAW_GATEWAY_TOKEN = TEST_GATEWAY_TOKEN;
|
||||
delete process.env.OPENCLAW_EXTENSION_RELAY_RECONNECT_GRACE_MS;
|
||||
delete process.env.OPENCLAW_EXTENSION_RELAY_COMMAND_RECONNECT_WAIT_MS;
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
@@ -341,6 +363,97 @@ describe("chrome extension relay server", () => {
|
||||
ext2.close();
|
||||
});
|
||||
|
||||
it("keeps CDP clients alive across a brief extension reconnect", async () => {
|
||||
const { port, ext: ext1 } = await startRelayWithExtension();
|
||||
const cdp = new WebSocket(`ws://127.0.0.1:${port}/cdp`, {
|
||||
headers: relayAuthHeaders(`ws://127.0.0.1:${port}/cdp`),
|
||||
});
|
||||
await waitForOpen(cdp);
|
||||
|
||||
let cdpClosed = false;
|
||||
cdp.once("close", () => {
|
||||
cdpClosed = true;
|
||||
});
|
||||
|
||||
const ext1Closed = waitForClose(ext1, 2_000);
|
||||
ext1.close();
|
||||
await ext1Closed;
|
||||
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
const ext2 = new WebSocket(`ws://127.0.0.1:${port}/extension`, {
|
||||
headers: relayAuthHeaders(`ws://127.0.0.1:${port}/extension`),
|
||||
});
|
||||
await waitForOpen(ext2);
|
||||
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
expect(cdpClosed).toBe(false);
|
||||
|
||||
cdp.close();
|
||||
ext2.close();
|
||||
});
|
||||
|
||||
it("waits briefly for extension reconnect before failing CDP commands", async () => {
|
||||
const { port, ext: ext1 } = await startRelayWithExtension();
|
||||
const cdp = new WebSocket(`ws://127.0.0.1:${port}/cdp`, {
|
||||
headers: relayAuthHeaders(`ws://127.0.0.1:${port}/cdp`),
|
||||
});
|
||||
await waitForOpen(cdp);
|
||||
const cdpQueue = createMessageQueue(cdp);
|
||||
|
||||
const ext1Closed = waitForClose(ext1, 2_000);
|
||||
ext1.close();
|
||||
await ext1Closed;
|
||||
|
||||
cdp.send(JSON.stringify({ id: 41, method: "Runtime.enable" }));
|
||||
await new Promise((r) => setTimeout(r, 150));
|
||||
|
||||
const ext2 = new WebSocket(`ws://127.0.0.1:${port}/extension`, {
|
||||
headers: relayAuthHeaders(`ws://127.0.0.1:${port}/extension`),
|
||||
});
|
||||
const ext2Queue = createMessageQueue(ext2);
|
||||
await waitForOpen(ext2);
|
||||
|
||||
while (true) {
|
||||
const msg = JSON.parse(await ext2Queue.next(4_000)) as {
|
||||
id?: number;
|
||||
method?: string;
|
||||
};
|
||||
if (msg.method === "ping") {
|
||||
ext2.send(JSON.stringify({ method: "pong" }));
|
||||
continue;
|
||||
}
|
||||
if (msg.method === "forwardCDPCommand" && typeof msg.id === "number") {
|
||||
ext2.send(JSON.stringify({ id: msg.id, result: { ok: true } }));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const response = JSON.parse(await cdpQueue.next(6_000)) as {
|
||||
id?: number;
|
||||
result?: { ok?: boolean };
|
||||
error?: { message?: string };
|
||||
};
|
||||
expect(response.id).toBe(41);
|
||||
expect(response.error).toBeUndefined();
|
||||
expect(response.result?.ok).toBe(true);
|
||||
|
||||
cdp.close();
|
||||
ext2.close();
|
||||
});
|
||||
|
||||
it("closes CDP clients after reconnect grace when extension stays disconnected", async () => {
|
||||
process.env.OPENCLAW_EXTENSION_RELAY_RECONNECT_GRACE_MS = "150";
|
||||
|
||||
const { port, ext } = await startRelayWithExtension();
|
||||
const cdp = new WebSocket(`ws://127.0.0.1:${port}/cdp`, {
|
||||
headers: relayAuthHeaders(`ws://127.0.0.1:${port}/cdp`),
|
||||
});
|
||||
await waitForOpen(cdp);
|
||||
|
||||
ext.close();
|
||||
await waitForClose(cdp, 2_000);
|
||||
});
|
||||
|
||||
it("accepts extension websocket access with relay token query param", async () => {
|
||||
const port = await getFreePort();
|
||||
cdpUrl = `http://127.0.0.1:${port}`;
|
||||
|
||||
@@ -82,6 +82,8 @@ type ConnectedTarget = {
|
||||
};
|
||||
|
||||
const RELAY_AUTH_HEADER = "x-openclaw-relay-token";
|
||||
const DEFAULT_EXTENSION_RECONNECT_GRACE_MS = 5_000;
|
||||
const DEFAULT_EXTENSION_COMMAND_RECONNECT_WAIT_MS = 3_000;
|
||||
|
||||
function headerValue(value: string | string[] | undefined): string | undefined {
|
||||
if (!value) {
|
||||
@@ -171,6 +173,18 @@ function rejectUpgrade(socket: Duplex, status: number, bodyText: string) {
|
||||
}
|
||||
}
|
||||
|
||||
function envMsOrDefault(name: string, fallback: number): number {
|
||||
const raw = process.env[name];
|
||||
if (!raw || raw.trim() === "") {
|
||||
return fallback;
|
||||
}
|
||||
const parsed = Number.parseInt(raw, 10);
|
||||
if (!Number.isFinite(parsed) || parsed <= 0) {
|
||||
return fallback;
|
||||
}
|
||||
return parsed;
|
||||
}
|
||||
|
||||
const relayRuntimeByPort = new Map<number, RelayRuntime>();
|
||||
const relayInitByPort = new Map<number, Promise<ChromeExtensionRelayServer>>();
|
||||
|
||||
@@ -225,6 +239,15 @@ export async function ensureChromeExtensionRelayServer(opts: {
|
||||
return await inFlight;
|
||||
}
|
||||
|
||||
const extensionReconnectGraceMs = envMsOrDefault(
|
||||
"OPENCLAW_EXTENSION_RELAY_RECONNECT_GRACE_MS",
|
||||
DEFAULT_EXTENSION_RECONNECT_GRACE_MS,
|
||||
);
|
||||
const extensionCommandReconnectWaitMs = envMsOrDefault(
|
||||
"OPENCLAW_EXTENSION_RELAY_COMMAND_RECONNECT_WAIT_MS",
|
||||
DEFAULT_EXTENSION_COMMAND_RECONNECT_WAIT_MS,
|
||||
);
|
||||
|
||||
const initPromise = (async (): Promise<ChromeExtensionRelayServer> => {
|
||||
const relayAuthToken = resolveRelayAuthTokenForPort(info.port);
|
||||
const relayAuthTokens = new Set(resolveRelayAcceptedTokensForPort(info.port));
|
||||
@@ -233,6 +256,73 @@ export async function ensureChromeExtensionRelayServer(opts: {
|
||||
const cdpClients = new Set<WebSocket>();
|
||||
const connectedTargets = new Map<string, ConnectedTarget>();
|
||||
const extensionConnected = () => extensionWs?.readyState === WebSocket.OPEN;
|
||||
let extensionDisconnectCleanupTimer: NodeJS.Timeout | null = null;
|
||||
const extensionReconnectWaiters = new Set<(connected: boolean) => void>();
|
||||
|
||||
const flushExtensionReconnectWaiters = (connected: boolean) => {
|
||||
if (extensionReconnectWaiters.size === 0) {
|
||||
return;
|
||||
}
|
||||
const waiters = Array.from(extensionReconnectWaiters);
|
||||
extensionReconnectWaiters.clear();
|
||||
for (const waiter of waiters) {
|
||||
waiter(connected);
|
||||
}
|
||||
};
|
||||
|
||||
const clearExtensionDisconnectCleanupTimer = () => {
|
||||
if (!extensionDisconnectCleanupTimer) {
|
||||
return;
|
||||
}
|
||||
clearTimeout(extensionDisconnectCleanupTimer);
|
||||
extensionDisconnectCleanupTimer = null;
|
||||
};
|
||||
|
||||
const closeCdpClientsAfterExtensionDisconnect = () => {
|
||||
connectedTargets.clear();
|
||||
for (const client of cdpClients) {
|
||||
try {
|
||||
client.close(1011, "extension disconnected");
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
cdpClients.clear();
|
||||
flushExtensionReconnectWaiters(false);
|
||||
};
|
||||
|
||||
const scheduleExtensionDisconnectCleanup = () => {
|
||||
clearExtensionDisconnectCleanupTimer();
|
||||
extensionDisconnectCleanupTimer = setTimeout(() => {
|
||||
extensionDisconnectCleanupTimer = null;
|
||||
if (extensionConnected()) {
|
||||
return;
|
||||
}
|
||||
closeCdpClientsAfterExtensionDisconnect();
|
||||
}, extensionReconnectGraceMs);
|
||||
};
|
||||
|
||||
const waitForExtensionReconnect = async (timeoutMs: number): Promise<boolean> => {
|
||||
if (extensionConnected()) {
|
||||
return true;
|
||||
}
|
||||
return await new Promise<boolean>((resolve) => {
|
||||
let settled = false;
|
||||
const waiter = (connected: boolean) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
clearTimeout(timer);
|
||||
extensionReconnectWaiters.delete(waiter);
|
||||
resolve(connected);
|
||||
};
|
||||
const timer = setTimeout(() => {
|
||||
waiter(false);
|
||||
}, timeoutMs);
|
||||
extensionReconnectWaiters.add(waiter);
|
||||
});
|
||||
};
|
||||
|
||||
const pendingExtension = new Map<
|
||||
number,
|
||||
@@ -543,10 +633,6 @@ export async function ensureChromeExtensionRelayServer(opts: {
|
||||
rejectUpgrade(socket, 401, "Unauthorized");
|
||||
return;
|
||||
}
|
||||
if (extensionConnected()) {
|
||||
rejectUpgrade(socket, 409, "Extension already connected");
|
||||
return;
|
||||
}
|
||||
// MV3 worker reconnect races can leave a stale non-OPEN socket reference.
|
||||
if (extensionWs && extensionWs.readyState !== WebSocket.OPEN) {
|
||||
try {
|
||||
@@ -556,6 +642,10 @@ export async function ensureChromeExtensionRelayServer(opts: {
|
||||
}
|
||||
extensionWs = null;
|
||||
}
|
||||
if (extensionConnected()) {
|
||||
rejectUpgrade(socket, 409, "Extension already connected");
|
||||
return;
|
||||
}
|
||||
wssExtension.handleUpgrade(req, socket, head, (ws) => {
|
||||
wssExtension.emit("connection", ws, req);
|
||||
});
|
||||
@@ -583,6 +673,8 @@ export async function ensureChromeExtensionRelayServer(opts: {
|
||||
|
||||
wssExtension.on("connection", (ws) => {
|
||||
extensionWs = ws;
|
||||
clearExtensionDisconnectCleanupTimer();
|
||||
flushExtensionReconnectWaiters(true);
|
||||
|
||||
const ping = setInterval(() => {
|
||||
if (ws.readyState !== WebSocket.OPEN) {
|
||||
@@ -710,16 +802,7 @@ export async function ensureChromeExtensionRelayServer(opts: {
|
||||
pending.reject(new Error("extension disconnected"));
|
||||
}
|
||||
pendingExtension.clear();
|
||||
connectedTargets.clear();
|
||||
|
||||
for (const client of cdpClients) {
|
||||
try {
|
||||
client.close(1011, "extension disconnected");
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
cdpClients.clear();
|
||||
scheduleExtensionDisconnectCleanup();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -741,12 +824,15 @@ export async function ensureChromeExtensionRelayServer(opts: {
|
||||
}
|
||||
|
||||
if (!extensionConnected()) {
|
||||
sendResponseToCdp(ws, {
|
||||
id: cmd.id,
|
||||
sessionId: cmd.sessionId,
|
||||
error: { message: "Extension not connected" },
|
||||
});
|
||||
return;
|
||||
const reconnected = await waitForExtensionReconnect(extensionCommandReconnectWaitMs);
|
||||
if (!reconnected || !extensionConnected()) {
|
||||
sendResponseToCdp(ws, {
|
||||
id: cmd.id,
|
||||
sessionId: cmd.sessionId,
|
||||
error: { message: "Extension not connected" },
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
@@ -841,6 +927,8 @@ export async function ensureChromeExtensionRelayServer(opts: {
|
||||
extensionConnected,
|
||||
stop: async () => {
|
||||
relayRuntimeByPort.delete(port);
|
||||
clearExtensionDisconnectCleanupTimer();
|
||||
flushExtensionReconnectWaiters(false);
|
||||
for (const [, pending] of pendingExtension) {
|
||||
clearTimeout(pending.timer);
|
||||
pending.reject(new Error("server stopping"));
|
||||
|
||||
Reference in New Issue
Block a user