fix(gateway): fail fast exec approvals when no approvers are reachable
Co-authored-by: fanxian831-netizen <262880470+fanxian831-netizen@users.noreply.github.com>
This commit is contained in:
@@ -34,6 +34,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
- Plugins/Media sandbox: propagate trusted `mediaLocalRoots` through plugin action dispatch (including Discord/Telegram action adapters) so plugin send paths enforce the same agent-scoped local-media sandbox roots as core outbound sends. (#20258, #22718)
|
||||
- Agents/Workspace guard: map sandbox container-workdir file-tool paths (for example `/workspace/...` and `file:///workspace/...`) to host workspace roots before workspace-only validation, preventing false `Path escapes sandbox root` rejections for sandbox file tools. (#9560)
|
||||
- Gateway/Exec approvals: expire approval requests immediately when no approval-capable gateway clients are connected and no forwarding targets are available, avoiding delayed approvals after restarts/offline approver windows. (#22144)
|
||||
- Slack/Threading: sessions: keep parent-session forking and thread-history context active beyond first turn by removing first-turn-only gates in session init, thread-history fetch, and reply prompt context injection. (#23843, #23090) Thanks @vincentkoc and @Taskle.
|
||||
- Slack/Threading: respect `replyToMode` when Slack auto-populates top-level `thread_ts`, and ignore inline `replyToId` directive tags when `replyToMode` is `off` so thread forcing stays disabled unless explicitly configured. (#23839, #23320, #23513) Thanks @vincentkoc and @dorukardahan.
|
||||
- Slack/Extension: forward `message read` `threadId` to `readMessages` and use delivery-context `threadId` as outbound `thread_ts` fallback so extension replies/reads stay in the correct Slack thread. (#22216, #22485, #23836) Thanks @vincentkoc, @lan17 and @dorukardahan.
|
||||
|
||||
@@ -86,18 +86,7 @@ export class ExecApprovalManager {
|
||||
promise,
|
||||
};
|
||||
entry.timer = setTimeout(() => {
|
||||
// Update snapshot fields before resolving (mirror resolve()'s bookkeeping)
|
||||
record.resolvedAtMs = Date.now();
|
||||
record.decision = undefined;
|
||||
record.resolvedBy = null;
|
||||
resolvePromise(null);
|
||||
// Keep entry briefly for in-flight awaitDecision calls
|
||||
setTimeout(() => {
|
||||
// Compare against captured entry instance, not re-fetched from map
|
||||
if (this.pending.get(record.id) === entry) {
|
||||
this.pending.delete(record.id);
|
||||
}
|
||||
}, RESOLVED_ENTRY_GRACE_MS);
|
||||
this.expire(record.id);
|
||||
}, timeoutMs);
|
||||
this.pending.set(record.id, entry);
|
||||
return promise;
|
||||
@@ -138,6 +127,27 @@ export class ExecApprovalManager {
|
||||
return true;
|
||||
}
|
||||
|
||||
expire(recordId: string, resolvedBy?: string | null): boolean {
|
||||
const pending = this.pending.get(recordId);
|
||||
if (!pending) {
|
||||
return false;
|
||||
}
|
||||
if (pending.record.resolvedAtMs !== undefined) {
|
||||
return false;
|
||||
}
|
||||
clearTimeout(pending.timer);
|
||||
pending.record.resolvedAtMs = Date.now();
|
||||
pending.record.decision = undefined;
|
||||
pending.record.resolvedBy = resolvedBy ?? null;
|
||||
pending.resolve(null);
|
||||
setTimeout(() => {
|
||||
if (this.pending.get(recordId) === pending) {
|
||||
this.pending.delete(recordId);
|
||||
}
|
||||
}, RESOLVED_ENTRY_GRACE_MS);
|
||||
return true;
|
||||
}
|
||||
|
||||
getSnapshot(recordId: string): ExecApprovalRecord | null {
|
||||
const entry = this.pending.get(recordId);
|
||||
return entry?.record ?? null;
|
||||
|
||||
@@ -17,6 +17,14 @@ export function createExecApprovalHandlers(
|
||||
manager: ExecApprovalManager,
|
||||
opts?: { forwarder?: ExecApprovalForwarder },
|
||||
): GatewayRequestHandlers {
|
||||
const hasApprovalClients = (context: { hasExecApprovalClients?: () => boolean }) => {
|
||||
if (typeof context.hasExecApprovalClients === "function") {
|
||||
return context.hasExecApprovalClients();
|
||||
}
|
||||
// Fail closed when no operator-scope probe is available.
|
||||
return false;
|
||||
};
|
||||
|
||||
return {
|
||||
"exec.approval.request": async ({ params, respond, context, client }) => {
|
||||
if (!validateExecApprovalRequestParams(params)) {
|
||||
@@ -96,16 +104,23 @@ export function createExecApprovalHandlers(
|
||||
},
|
||||
{ dropIfSlow: true },
|
||||
);
|
||||
void opts?.forwarder
|
||||
?.handleRequested({
|
||||
id: record.id,
|
||||
request: record.request,
|
||||
createdAtMs: record.createdAtMs,
|
||||
expiresAtMs: record.expiresAtMs,
|
||||
})
|
||||
.catch((err) => {
|
||||
let forwardedToTargets = false;
|
||||
if (opts?.forwarder) {
|
||||
try {
|
||||
forwardedToTargets = await opts.forwarder.handleRequested({
|
||||
id: record.id,
|
||||
request: record.request,
|
||||
createdAtMs: record.createdAtMs,
|
||||
expiresAtMs: record.expiresAtMs,
|
||||
});
|
||||
} catch (err) {
|
||||
context.logGateway?.error?.(`exec approvals: forward request failed: ${String(err)}`);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (!hasApprovalClients(context) && !forwardedToTargets) {
|
||||
manager.expire(record.id, "auto-expire:no-approver-clients");
|
||||
}
|
||||
|
||||
// Only send immediate "accepted" response when twoPhase is requested.
|
||||
// This preserves single-response semantics for existing callers.
|
||||
|
||||
@@ -254,6 +254,7 @@ describe("exec approval handlers", () => {
|
||||
|
||||
function toExecApprovalRequestContext(context: {
|
||||
broadcast: (event: string, payload: unknown) => void;
|
||||
hasExecApprovalClients?: () => boolean;
|
||||
}): ExecApprovalRequestArgs["context"] {
|
||||
return context as unknown as ExecApprovalRequestArgs["context"];
|
||||
}
|
||||
@@ -277,7 +278,10 @@ describe("exec approval handlers", () => {
|
||||
return params.handlers["exec.approval.request"]({
|
||||
params: requestParams,
|
||||
respond: params.respond as unknown as ExecApprovalRequestArgs["respond"],
|
||||
context: toExecApprovalRequestContext(params.context),
|
||||
context: toExecApprovalRequestContext({
|
||||
hasExecApprovalClients: () => true,
|
||||
...params.context,
|
||||
}),
|
||||
client: null,
|
||||
req: { id: "req-1", type: "req", method: "exec.approval.request" },
|
||||
isWebchatConnect: execApprovalNoop,
|
||||
@@ -309,6 +313,7 @@ describe("exec approval handlers", () => {
|
||||
broadcast: (event: string, payload: unknown) => {
|
||||
broadcasts.push({ event, payload });
|
||||
},
|
||||
hasExecApprovalClients: () => true,
|
||||
};
|
||||
return { handlers, broadcasts, respond, context };
|
||||
}
|
||||
@@ -463,6 +468,46 @@ describe("exec approval handlers", () => {
|
||||
);
|
||||
expect(resolveRespond).toHaveBeenCalledWith(true, { ok: true }, undefined);
|
||||
});
|
||||
|
||||
it("expires immediately when no approver clients and no forwarding targets", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const manager = new ExecApprovalManager();
|
||||
const forwarder = {
|
||||
handleRequested: vi.fn(async () => false),
|
||||
handleResolved: vi.fn(async () => {}),
|
||||
stop: vi.fn(),
|
||||
};
|
||||
const handlers = createExecApprovalHandlers(manager, { forwarder });
|
||||
const respond = vi.fn();
|
||||
const context = {
|
||||
broadcast: (_event: string, _payload: unknown) => {},
|
||||
hasExecApprovalClients: () => false,
|
||||
};
|
||||
const expireSpy = vi.spyOn(manager, "expire");
|
||||
|
||||
const requestPromise = requestExecApproval({
|
||||
handlers,
|
||||
respond,
|
||||
context,
|
||||
params: { timeoutMs: 60_000 },
|
||||
});
|
||||
for (let idx = 0; idx < 20; idx += 1) {
|
||||
await Promise.resolve();
|
||||
}
|
||||
expect(forwarder.handleRequested).toHaveBeenCalledTimes(1);
|
||||
expect(expireSpy).toHaveBeenCalledTimes(1);
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
await requestPromise;
|
||||
expect(respond).toHaveBeenCalledWith(
|
||||
true,
|
||||
expect.objectContaining({ decision: null }),
|
||||
undefined,
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("gateway healthHandlers.status scope handling", () => {
|
||||
|
||||
@@ -47,6 +47,7 @@ export type GatewayRequestContext = {
|
||||
nodeUnsubscribe: (nodeId: string, sessionKey: string) => void;
|
||||
nodeUnsubscribeAll: (nodeId: string) => void;
|
||||
hasConnectedMobileNode: () => boolean;
|
||||
hasExecApprovalClients?: () => boolean;
|
||||
nodeRegistry: NodeRegistry;
|
||||
agentRunSeq: Map<string, number>;
|
||||
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
|
||||
|
||||
@@ -602,6 +602,17 @@ export async function startGatewayServer(
|
||||
nodeUnsubscribe,
|
||||
nodeUnsubscribeAll,
|
||||
hasConnectedMobileNode: hasMobileNodeConnected,
|
||||
hasExecApprovalClients: () => {
|
||||
for (const gatewayClient of clients) {
|
||||
const scopes = Array.isArray(gatewayClient.connect.scopes)
|
||||
? gatewayClient.connect.scopes
|
||||
: [];
|
||||
if (scopes.includes("operator.admin") || scopes.includes("operator.approvals")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
},
|
||||
nodeRegistry,
|
||||
agentRunSeq,
|
||||
chatAbortControllers,
|
||||
|
||||
@@ -63,7 +63,7 @@ describe("exec approval forwarder", () => {
|
||||
resolveSessionTarget: () => ({ channel: "slack", to: "U1" }),
|
||||
});
|
||||
|
||||
await forwarder.handleRequested(baseRequest);
|
||||
await expect(forwarder.handleRequested(baseRequest)).resolves.toBe(true);
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
|
||||
await forwarder.handleResolved({
|
||||
@@ -82,7 +82,7 @@ describe("exec approval forwarder", () => {
|
||||
vi.useFakeTimers();
|
||||
const { deliver, forwarder } = createForwarder({ cfg: TARGETS_CFG });
|
||||
|
||||
await forwarder.handleRequested(baseRequest);
|
||||
await expect(forwarder.handleRequested(baseRequest)).resolves.toBe(true);
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
@@ -93,7 +93,7 @@ describe("exec approval forwarder", () => {
|
||||
vi.useFakeTimers();
|
||||
const { deliver, forwarder } = createForwarder({ cfg: TARGETS_CFG });
|
||||
|
||||
await forwarder.handleRequested(baseRequest);
|
||||
await expect(forwarder.handleRequested(baseRequest)).resolves.toBe(true);
|
||||
|
||||
expect(getFirstDeliveryText(deliver)).toContain("Command: `echo hello`");
|
||||
});
|
||||
@@ -102,17 +102,50 @@ describe("exec approval forwarder", () => {
|
||||
vi.useFakeTimers();
|
||||
const { deliver, forwarder } = createForwarder({ cfg: TARGETS_CFG });
|
||||
|
||||
await forwarder.handleRequested({
|
||||
...baseRequest,
|
||||
request: {
|
||||
...baseRequest.request,
|
||||
command: "echo `uname`\necho done",
|
||||
},
|
||||
});
|
||||
await expect(
|
||||
forwarder.handleRequested({
|
||||
...baseRequest,
|
||||
request: {
|
||||
...baseRequest.request,
|
||||
command: "echo `uname`\necho done",
|
||||
},
|
||||
}),
|
||||
).resolves.toBe(true);
|
||||
|
||||
expect(getFirstDeliveryText(deliver)).toContain("Command:\n```\necho `uname`\necho done\n```");
|
||||
});
|
||||
|
||||
it("returns false when forwarding is disabled", async () => {
|
||||
const { deliver, forwarder } = createForwarder({
|
||||
cfg: {} as OpenClawConfig,
|
||||
});
|
||||
await expect(forwarder.handleRequested(baseRequest)).resolves.toBe(false);
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("returns false when all targets are skipped", async () => {
|
||||
vi.useFakeTimers();
|
||||
const cfg = {
|
||||
channels: {
|
||||
discord: {
|
||||
execApprovals: {
|
||||
enabled: true,
|
||||
approvers: ["123"],
|
||||
},
|
||||
},
|
||||
},
|
||||
approvals: { exec: { enabled: true, mode: "session" } },
|
||||
} as OpenClawConfig;
|
||||
|
||||
const { deliver, forwarder } = createForwarder({
|
||||
cfg,
|
||||
resolveSessionTarget: () => ({ channel: "discord", to: "channel:123" }),
|
||||
});
|
||||
|
||||
await expect(forwarder.handleRequested(baseRequest)).resolves.toBe(false);
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("forwards to discord when discord exec approvals handler is disabled", async () => {
|
||||
vi.useFakeTimers();
|
||||
const cfg = {
|
||||
@@ -124,7 +157,7 @@ describe("exec approval forwarder", () => {
|
||||
resolveSessionTarget: () => ({ channel: "discord", to: "channel:123" }),
|
||||
});
|
||||
|
||||
await forwarder.handleRequested(baseRequest);
|
||||
await expect(forwarder.handleRequested(baseRequest)).resolves.toBe(true);
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
@@ -148,7 +181,7 @@ describe("exec approval forwarder", () => {
|
||||
resolveSessionTarget: () => ({ channel: "discord", to: "channel:123" }),
|
||||
});
|
||||
|
||||
await forwarder.handleRequested(baseRequest);
|
||||
await expect(forwarder.handleRequested(baseRequest)).resolves.toBe(false);
|
||||
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
});
|
||||
@@ -185,13 +218,15 @@ describe("exec approval forwarder", () => {
|
||||
vi.useFakeTimers();
|
||||
const { deliver, forwarder } = createForwarder({ cfg: TARGETS_CFG });
|
||||
|
||||
await forwarder.handleRequested({
|
||||
...baseRequest,
|
||||
request: {
|
||||
...baseRequest.request,
|
||||
command: "echo ```danger```",
|
||||
},
|
||||
});
|
||||
await expect(
|
||||
forwarder.handleRequested({
|
||||
...baseRequest,
|
||||
request: {
|
||||
...baseRequest.request,
|
||||
command: "echo ```danger```",
|
||||
},
|
||||
}),
|
||||
).resolves.toBe(true);
|
||||
|
||||
expect(getFirstDeliveryText(deliver)).toContain("Command:\n````\necho ```danger```\n````");
|
||||
});
|
||||
|
||||
@@ -29,7 +29,7 @@ type PendingApproval = {
|
||||
};
|
||||
|
||||
export type ExecApprovalForwarder = {
|
||||
handleRequested: (request: ExecApprovalRequest) => Promise<void>;
|
||||
handleRequested: (request: ExecApprovalRequest) => Promise<boolean>;
|
||||
handleResolved: (resolved: ExecApprovalResolved) => Promise<void>;
|
||||
stop: () => void;
|
||||
};
|
||||
@@ -318,11 +318,11 @@ export function createExecApprovalForwarder(
|
||||
const resolveSessionTarget = deps.resolveSessionTarget ?? defaultResolveSessionTarget;
|
||||
const pending = new Map<string, PendingApproval>();
|
||||
|
||||
const handleRequested = async (request: ExecApprovalRequest) => {
|
||||
const handleRequested = async (request: ExecApprovalRequest): Promise<boolean> => {
|
||||
const cfg = getConfig();
|
||||
const config = cfg.approvals?.exec;
|
||||
if (!shouldForward({ config, request })) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
const filteredTargets = resolveForwardTargets({
|
||||
cfg,
|
||||
@@ -332,7 +332,7 @@ export function createExecApprovalForwarder(
|
||||
}).filter((target) => !shouldSkipDiscordForwarding(target, cfg));
|
||||
|
||||
if (filteredTargets.length === 0) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
const expiresInMs = Math.max(0, request.expiresAtMs - nowMs());
|
||||
@@ -353,17 +353,20 @@ export function createExecApprovalForwarder(
|
||||
pending.set(request.id, pendingEntry);
|
||||
|
||||
if (pending.get(request.id) !== pendingEntry) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
const text = buildRequestMessage(request, nowMs());
|
||||
await deliverToTargets({
|
||||
void deliverToTargets({
|
||||
cfg,
|
||||
targets: filteredTargets,
|
||||
text,
|
||||
deliver,
|
||||
shouldSend: () => pending.get(request.id) === pendingEntry,
|
||||
}).catch((err) => {
|
||||
log.error(`exec approvals: failed to deliver request ${request.id}: ${String(err)}`);
|
||||
});
|
||||
return true;
|
||||
};
|
||||
|
||||
const handleResolved = async (resolved: ExecApprovalResolved) => {
|
||||
|
||||
Reference in New Issue
Block a user