fix(agents): bypass pendingDescendantRuns guard for cron announce delivery (#35185)
* fix(agents): bypass pendingDescendantRuns guard for cron announce delivery Standalone cron job completions were blocked from direct channel delivery when the cron run had spawned subagents that were still registered as pending. The pendingDescendantRuns guard exists for live orchestration coordination and should not apply to fire-and-forget cron announce sends. Thread the announceType through the delivery chain and skip both the child-descendant and requester-descendant pending-run guards when the announce originates from a cron job. Closes #34966 * fix: ensure outbound session entry for cron announce with named agents (#32432) Named agents may not have a session entry for their delivery target, causing the announce flow to silently fail (delivered=false, no error). Two fixes: 1. Call ensureOutboundSessionEntry when resolving the cron announce session key so downstream delivery can find channel metadata. 2. Fall back to direct outbound delivery when announce delivery fails to ensure cron output reaches the target channel. Closes #32432 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: guard announce direct-delivery fallback against suppression leaks (#32432) The `!delivered` fallback condition was too broad — it caught intentional suppressions (active subagents, interim messages, SILENT_REPLY_TOKEN) in addition to actual announce delivery failures. Add an `announceDeliveryWasAttempted` flag so the direct-delivery fallback only fires when `runSubagentAnnounceFlow` was actually called and failed. Also remove the redundant `if (route)` guard in `resolveCronAnnounceSessionKey` since `resolved` being truthy guarantees `route` is non-null. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(cron): harden announce synthesis follow-ups --------- Co-authored-by: scoootscooob <zhentongfan@gmail.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
- Security/dependency audit: patch transitive Hono vulnerabilities by pinning `hono` to `4.12.5` and `@hono/node-server` to `1.19.10` in production resolution paths. Thanks @shakkernerd.
|
||||
- Security/dependency audit: bump `tar` to `7.5.10` (from `7.5.9`) to address the high-severity hardlink path traversal advisory (`GHSA-qffp-2rhf-9h96`). Thanks @shakkernerd.
|
||||
- Cron/announce delivery robustness: bypass pending-descendant announce guards for cron completion sends, ensure named-agent announce routes have outbound session entries, and fall back to direct delivery only when an announce send was actually attempted and failed. (from #35185, #32443, #34987) Thanks @Sid-Qin, @scoootscooob, and @bmendonca3.
|
||||
- Auto-reply/system events: restore runtime system events to the message timeline (`System:` lines), preserve think-hint parsing with prepended events, and carry events into deferred followup/collect/steer-backlog prompts to keep cache behavior stable without dropping queued metadata. (#34794) Thanks @anisoptera.
|
||||
- Security/audit account handling: avoid prototype-chain account IDs in audit validation by using own-property checks for `accounts`. (#34982) Thanks @HOYALIM.
|
||||
- Agents/session usage tracking: preserve accumulated usage metadata on embedded Pi runner error exits so failed turns still update session `totalTokens` from real usage instead of stale prior values. (#34275) thanks @RealKai42.
|
||||
|
||||
@@ -469,6 +469,53 @@ describe("subagent announce formatting", () => {
|
||||
expect(agentSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("keeps cron completion direct delivery even when sibling runs are still active", async () => {
|
||||
sessionStore = {
|
||||
"agent:main:subagent:test": {
|
||||
sessionId: "child-session-cron-direct",
|
||||
},
|
||||
"agent:main:main": {
|
||||
sessionId: "requester-session-cron-direct",
|
||||
},
|
||||
};
|
||||
readLatestAssistantReplyMock.mockResolvedValue("");
|
||||
chatHistoryMock.mockResolvedValueOnce({
|
||||
messages: [{ role: "assistant", content: [{ type: "text", text: "final answer: cron" }] }],
|
||||
});
|
||||
subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) =>
|
||||
sessionKey === "agent:main:main" ? 1 : 0,
|
||||
);
|
||||
subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) =>
|
||||
sessionKey === "agent:main:main" ? 1 : 0,
|
||||
);
|
||||
subagentRegistryMock.countPendingDescendantRunsExcludingRun.mockImplementation(
|
||||
(sessionKey: string, runId: string) =>
|
||||
sessionKey === "agent:main:main" && runId === "run-direct-cron-active-siblings" ? 1 : 0,
|
||||
);
|
||||
|
||||
const didAnnounce = await runSubagentAnnounceFlow({
|
||||
childSessionKey: "agent:main:subagent:test",
|
||||
childRunId: "run-direct-cron-active-siblings",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" },
|
||||
announceType: "cron job",
|
||||
...defaultOutcomeAnnounce,
|
||||
expectsCompletionMessage: true,
|
||||
});
|
||||
|
||||
expect(didAnnounce).toBe(true);
|
||||
expect(sendSpy).toHaveBeenCalledTimes(1);
|
||||
expect(agentSpy).not.toHaveBeenCalled();
|
||||
const call = sendSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
|
||||
const rawMessage = call?.params?.message;
|
||||
const msg = typeof rawMessage === "string" ? rawMessage : "";
|
||||
expect(call?.params?.channel).toBe("discord");
|
||||
expect(call?.params?.to).toBe("channel:12345");
|
||||
expect(msg).toContain("final answer: cron");
|
||||
expect(msg).not.toContain("There are still 1 active subagent run for this session.");
|
||||
});
|
||||
|
||||
it("suppresses completion delivery when subagent reply is ANNOUNCE_SKIP", async () => {
|
||||
const didAnnounce = await runSubagentAnnounceFlow({
|
||||
childSessionKey: "agent:main:subagent:test",
|
||||
|
||||
@@ -736,6 +736,7 @@ async function sendSubagentAnnounceDirectly(params: {
|
||||
bestEffortDeliver?: boolean;
|
||||
completionRouteMode?: "bound" | "fallback" | "hook";
|
||||
spawnMode?: SpawnSubagentMode;
|
||||
announceType?: SubagentAnnounceType;
|
||||
directIdempotencyKey: string;
|
||||
currentRunId?: string;
|
||||
completionDirectOrigin?: DeliveryContext;
|
||||
@@ -778,8 +779,9 @@ async function sendSubagentAnnounceDirectly(params: {
|
||||
const forceBoundSessionDirectDelivery =
|
||||
params.spawnMode === "session" &&
|
||||
(params.completionRouteMode === "bound" || params.completionRouteMode === "hook");
|
||||
const forceCronDirectDelivery = params.announceType === "cron job";
|
||||
let shouldSendCompletionDirectly = true;
|
||||
if (!forceBoundSessionDirectDelivery) {
|
||||
if (!forceBoundSessionDirectDelivery && !forceCronDirectDelivery) {
|
||||
let pendingDescendantRuns = 0;
|
||||
try {
|
||||
const { countPendingDescendantRuns, countPendingDescendantRunsExcludingRun } =
|
||||
@@ -919,6 +921,7 @@ async function deliverSubagentAnnouncement(params: {
|
||||
bestEffortDeliver?: boolean;
|
||||
completionRouteMode?: "bound" | "fallback" | "hook";
|
||||
spawnMode?: SpawnSubagentMode;
|
||||
announceType?: SubagentAnnounceType;
|
||||
directIdempotencyKey: string;
|
||||
currentRunId?: string;
|
||||
signal?: AbortSignal;
|
||||
@@ -948,6 +951,7 @@ async function deliverSubagentAnnouncement(params: {
|
||||
completionDirectOrigin: params.completionDirectOrigin,
|
||||
completionRouteMode: params.completionRouteMode,
|
||||
spawnMode: params.spawnMode,
|
||||
announceType: params.announceType,
|
||||
directOrigin: params.directOrigin,
|
||||
requesterIsSubagent: params.requesterIsSubagent,
|
||||
expectsCompletionMessage: params.expectsCompletionMessage,
|
||||
@@ -1233,7 +1237,8 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
} catch {
|
||||
// Best-effort only; fall back to direct announce behavior when unavailable.
|
||||
}
|
||||
if (pendingChildDescendantRuns > 0) {
|
||||
const isCronAnnounce = params.announceType === "cron job";
|
||||
if (pendingChildDescendantRuns > 0 && !isCronAnnounce) {
|
||||
// The finished run still has pending descendant subagents (either active,
|
||||
// or ended but still finishing their own announce and cleanup flow). Defer
|
||||
// announcing this run until descendants fully settle.
|
||||
@@ -1406,6 +1411,7 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
bestEffortDeliver: params.bestEffortDeliver,
|
||||
completionRouteMode: completionResolution.routeMode,
|
||||
spawnMode: params.spawnMode,
|
||||
announceType,
|
||||
directIdempotencyKey,
|
||||
currentRunId: params.childRunId,
|
||||
signal: params.signal,
|
||||
|
||||
@@ -393,7 +393,7 @@ describe("runCronIsolatedAgentTurn", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("returns ok when announce delivery reports false and best-effort is disabled", async () => {
|
||||
it("falls back to direct delivery when announce reports false and best-effort is disabled", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
|
||||
const deps = createCliDeps();
|
||||
@@ -412,13 +412,12 @@ describe("runCronIsolatedAgentTurn", () => {
|
||||
},
|
||||
});
|
||||
|
||||
// Announce delivery failure should not mark a successful agent execution
|
||||
// as error. The execution succeeded; only delivery failed.
|
||||
// When announce delivery fails, the direct-delivery fallback fires
|
||||
// so the message still reaches the target channel.
|
||||
expect(res.status).toBe("ok");
|
||||
expect(res.delivered).toBe(false);
|
||||
expect(res.delivered).toBe(true);
|
||||
expect(res.deliveryAttempted).toBe(true);
|
||||
expect(res.error).toBe("cron announce delivery failed");
|
||||
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
|
||||
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -431,7 +430,7 @@ describe("runCronIsolatedAgentTurn", () => {
|
||||
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("returns ok when announce flow throws and best-effort is disabled", async () => {
|
||||
it("falls back to direct delivery when announce flow throws and best-effort is disabled", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
|
||||
const deps = createCliDeps();
|
||||
@@ -452,13 +451,12 @@ describe("runCronIsolatedAgentTurn", () => {
|
||||
},
|
||||
});
|
||||
|
||||
// Even when announce throws (e.g. "pairing required"), the agent
|
||||
// execution succeeded so the job status should be ok.
|
||||
// When announce throws (e.g. "pairing required"), the direct-delivery
|
||||
// fallback fires so the message still reaches the target channel.
|
||||
expect(res.status).toBe("ok");
|
||||
expect(res.delivered).toBe(false);
|
||||
expect(res.delivered).toBe(true);
|
||||
expect(res.deliveryAttempted).toBe(true);
|
||||
expect(res.error).toContain("pairing required");
|
||||
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
|
||||
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { matchesMessagingToolDeliveryTarget } from "./delivery-dispatch.js";
|
||||
|
||||
// Mock the announce flow dependencies to test the fallback behavior.
|
||||
vi.mock("../../agents/subagent-announce.js", () => ({
|
||||
runSubagentAnnounceFlow: vi.fn(),
|
||||
}));
|
||||
vi.mock("../../agents/subagent-registry.js", () => ({
|
||||
countActiveDescendantRuns: vi.fn().mockReturnValue(0),
|
||||
}));
|
||||
|
||||
describe("matchesMessagingToolDeliveryTarget", () => {
|
||||
it("matches when channel and to agree", () => {
|
||||
expect(
|
||||
matchesMessagingToolDeliveryTarget(
|
||||
{ provider: "telegram", to: "123456" },
|
||||
{ channel: "telegram", to: "123456" },
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("rejects when channel differs", () => {
|
||||
expect(
|
||||
matchesMessagingToolDeliveryTarget(
|
||||
{ provider: "whatsapp", to: "123456" },
|
||||
{ channel: "telegram", to: "123456" },
|
||||
),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("rejects when to is missing from delivery", () => {
|
||||
expect(
|
||||
matchesMessagingToolDeliveryTarget(
|
||||
{ provider: "telegram", to: "123456" },
|
||||
{ channel: "telegram", to: undefined },
|
||||
),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("rejects when channel is missing from delivery", () => {
|
||||
expect(
|
||||
matchesMessagingToolDeliveryTarget(
|
||||
{ provider: "telegram", to: "123456" },
|
||||
{ channel: undefined, to: "123456" },
|
||||
),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("strips :topic:NNN suffix from target.to before comparing", () => {
|
||||
expect(
|
||||
matchesMessagingToolDeliveryTarget(
|
||||
{ provider: "telegram", to: "-1003597428309:topic:462" },
|
||||
{ channel: "telegram", to: "-1003597428309" },
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("matches when provider is 'message' (generic)", () => {
|
||||
expect(
|
||||
matchesMessagingToolDeliveryTarget(
|
||||
{ provider: "message", to: "123456" },
|
||||
{ channel: "telegram", to: "123456" },
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("rejects when accountIds differ", () => {
|
||||
expect(
|
||||
matchesMessagingToolDeliveryTarget(
|
||||
{ provider: "telegram", to: "123456", accountId: "bot-a" },
|
||||
{ channel: "telegram", to: "123456", accountId: "bot-b" },
|
||||
),
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("resolveCronDeliveryBestEffort", () => {
|
||||
// Import dynamically to avoid top-level side effects
|
||||
it("returns false by default (no bestEffort set)", async () => {
|
||||
const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js");
|
||||
const job = { delivery: {}, payload: { kind: "agentTurn" } } as never;
|
||||
expect(resolveCronDeliveryBestEffort(job)).toBe(false);
|
||||
});
|
||||
|
||||
it("returns true when delivery.bestEffort is true", async () => {
|
||||
const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js");
|
||||
const job = { delivery: { bestEffort: true }, payload: { kind: "agentTurn" } } as never;
|
||||
expect(resolveCronDeliveryBestEffort(job)).toBe(true);
|
||||
});
|
||||
|
||||
it("returns true when payload.bestEffortDeliver is true and no delivery.bestEffort", async () => {
|
||||
const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js");
|
||||
const job = {
|
||||
delivery: {},
|
||||
payload: { kind: "agentTurn", bestEffortDeliver: true },
|
||||
} as never;
|
||||
expect(resolveCronDeliveryBestEffort(job)).toBe(true);
|
||||
});
|
||||
});
|
||||
@@ -7,7 +7,10 @@ import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { resolveAgentMainSessionKey } from "../../config/sessions.js";
|
||||
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
|
||||
import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js";
|
||||
import { resolveOutboundSessionRoute } from "../../infra/outbound/outbound-session.js";
|
||||
import {
|
||||
ensureOutboundSessionEntry,
|
||||
resolveOutboundSessionRoute,
|
||||
} from "../../infra/outbound/outbound-session.js";
|
||||
import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
|
||||
import { logWarn } from "../../logger.js";
|
||||
import type { CronJob, CronRunTelemetry } from "../types.js";
|
||||
@@ -93,7 +96,20 @@ async function resolveCronAnnounceSessionKey(params: {
|
||||
threadId: params.delivery.threadId,
|
||||
});
|
||||
const resolved = route?.sessionKey?.trim();
|
||||
if (resolved) {
|
||||
if (route && resolved) {
|
||||
// Ensure the session entry exists so downstream announce / queue delivery
|
||||
// can look up channel metadata (lastChannel, to, sessionId). Named agents
|
||||
// may not have a session entry for this target yet, causing announce
|
||||
// delivery to silently fail (#32432).
|
||||
await ensureOutboundSessionEntry({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: params.delivery.channel,
|
||||
accountId: params.delivery.accountId,
|
||||
route,
|
||||
}).catch(() => {
|
||||
// Best-effort: don't block delivery on session entry creation.
|
||||
});
|
||||
return resolved;
|
||||
}
|
||||
} catch {
|
||||
@@ -156,6 +172,12 @@ export async function dispatchCronDelivery(
|
||||
// Keep this strict so timer fallback can safely decide whether to wake main.
|
||||
let delivered = params.skipMessagingToolDelivery;
|
||||
let deliveryAttempted = params.skipMessagingToolDelivery;
|
||||
// Tracks whether `runSubagentAnnounceFlow` was actually called. Early
|
||||
// returns from `deliverViaAnnounce` (active subagents, interim suppression,
|
||||
// SILENT_REPLY_TOKEN) are intentional suppressions — not delivery failures —
|
||||
// so the direct-delivery fallback must only fire when the announce send was
|
||||
// actually attempted and failed.
|
||||
let announceDeliveryWasAttempted = false;
|
||||
const failDeliveryTarget = (error: string) =>
|
||||
params.withRunSession({
|
||||
status: "error",
|
||||
@@ -313,6 +335,7 @@ export async function dispatchCronDelivery(
|
||||
});
|
||||
}
|
||||
deliveryAttempted = true;
|
||||
announceDeliveryWasAttempted = true;
|
||||
const didAnnounce = await runSubagentAnnounceFlow({
|
||||
childSessionKey: params.agentSessionKey,
|
||||
childRunId: `${params.job.id}:${params.runSessionId}:${params.runStartedAt}`,
|
||||
@@ -443,6 +466,38 @@ export async function dispatchCronDelivery(
|
||||
} else {
|
||||
const announceResult = await deliverViaAnnounce(params.resolvedDelivery);
|
||||
if (announceResult) {
|
||||
// Fall back to direct delivery only when the announce send was
|
||||
// actually attempted and failed. Early returns from
|
||||
// deliverViaAnnounce (active subagents, interim suppression,
|
||||
// SILENT_REPLY_TOKEN) are intentional suppressions that must NOT
|
||||
// trigger direct delivery — doing so would bypass the suppression
|
||||
// guard and leak partial/stale content to the channel. (#32432)
|
||||
if (announceDeliveryWasAttempted && !delivered && !params.isAborted()) {
|
||||
const directFallback = await deliverViaDirect(params.resolvedDelivery);
|
||||
if (directFallback) {
|
||||
return {
|
||||
result: directFallback,
|
||||
delivered,
|
||||
deliveryAttempted,
|
||||
summary,
|
||||
outputText,
|
||||
synthesizedText,
|
||||
deliveryPayloads,
|
||||
};
|
||||
}
|
||||
// If direct delivery succeeded (returned null without error),
|
||||
// `delivered` has been set to true by deliverViaDirect.
|
||||
if (delivered) {
|
||||
return {
|
||||
delivered,
|
||||
deliveryAttempted,
|
||||
summary,
|
||||
outputText,
|
||||
synthesizedText,
|
||||
deliveryPayloads,
|
||||
};
|
||||
}
|
||||
}
|
||||
return {
|
||||
result: announceResult,
|
||||
delivered,
|
||||
|
||||
Reference in New Issue
Block a user