From 7a40d99b1d4d6e3eee590088e27cb33aa4547daa Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 23 Feb 2026 19:24:58 +0000 Subject: [PATCH] refactor(cron): extract delivery dispatch + harden reset notices --- .../reply/get-reply-run.media-only.test.ts | 28 ++ src/cron/isolated-agent/delivery-dispatch.ts | 414 ++++++++++++++++++ src/cron/isolated-agent/run.ts | 341 ++------------- 3 files changed, 481 insertions(+), 302 deletions(-) create mode 100644 src/cron/isolated-agent/delivery-dispatch.ts diff --git a/src/auto-reply/reply/get-reply-run.media-only.test.ts b/src/auto-reply/reply/get-reply-run.media-only.test.ts index e0d9be246..d4a40b4ed 100644 --- a/src/auto-reply/reply/get-reply-run.media-only.test.ts +++ b/src/auto-reply/reply/get-reply-run.media-only.test.ts @@ -221,4 +221,32 @@ describe("runPreparedReply media-only handling", () => { expect(resetNoticeCall?.payload?.text).not.toContain("api-key"); expect(resetNoticeCall?.payload?.text).not.toContain("env:"); }); + + it("skips reset notice when only webchat fallback routing is available", async () => { + await runPreparedReply( + baseParams({ + resetTriggered: true, + ctx: { + Body: "", + RawBody: "", + CommandBody: "", + ThreadHistoryBody: "Earlier message in this thread", + OriginatingChannel: undefined, + OriginatingTo: undefined, + ChatType: "group", + }, + command: { + isAuthorizedSender: true, + abortKey: "session-key", + ownerList: [], + senderIsOwner: false, + channel: "webchat", + from: undefined, + to: undefined, + } as never, + }), + ); + + expect(vi.mocked(routeReply)).not.toHaveBeenCalled(); + }); }); diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts new file mode 100644 index 000000000..697c0e2b8 --- /dev/null +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -0,0 +1,414 @@ +import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js"; +import { countActiveDescendantRuns } from "../../agents/subagent-registry.js"; +import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; +import type { ReplyPayload } from "../../auto-reply/types.js"; +import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import { resolveAgentMainSessionKey } from "../../config/sessions.js"; +import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; +import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js"; +import { resolveOutboundSessionRoute } from "../../infra/outbound/outbound-session.js"; +import { logWarn } from "../../logger.js"; +import type { CronJob, CronRunTelemetry } from "../types.js"; +import type { DeliveryTargetResolution } from "./delivery-target.js"; +import { pickSummaryFromOutput } from "./helpers.js"; +import type { RunCronAgentTurnResult } from "./run.js"; +import { + expectsSubagentFollowup, + isLikelyInterimCronMessage, + readDescendantSubagentFallbackReply, + waitForDescendantSubagentSummary, +} from "./subagent-followup.js"; + +export function matchesMessagingToolDeliveryTarget( + target: { provider?: string; to?: string; accountId?: string }, + delivery: { channel?: string; to?: string; accountId?: string }, +): boolean { + if (!delivery.channel || !delivery.to || !target.to) { + return false; + } + const channel = delivery.channel.trim().toLowerCase(); + const provider = target.provider?.trim().toLowerCase(); + if (provider && provider !== "message" && provider !== channel) { + return false; + } + if (target.accountId && delivery.accountId && target.accountId !== delivery.accountId) { + return false; + } + return target.to === delivery.to; +} + +export function resolveCronDeliveryBestEffort(job: CronJob): boolean { + if (typeof job.delivery?.bestEffort === "boolean") { + return job.delivery.bestEffort; + } + if (job.payload.kind === "agentTurn" && typeof job.payload.bestEffortDeliver === "boolean") { + return job.payload.bestEffortDeliver; + } + return false; +} + +async function resolveCronAnnounceSessionKey(params: { + cfg: OpenClawConfig; + agentId: string; + fallbackSessionKey: string; + delivery: { + channel: NonNullable; + to?: string; + accountId?: string; + threadId?: string | number; + }; +}): Promise { + const to = params.delivery.to?.trim(); + if (!to) { + return params.fallbackSessionKey; + } + try { + const route = await resolveOutboundSessionRoute({ + cfg: params.cfg, + channel: params.delivery.channel, + agentId: params.agentId, + accountId: params.delivery.accountId, + target: to, + threadId: params.delivery.threadId, + }); + const resolved = route?.sessionKey?.trim(); + if (resolved) { + return resolved; + } + } catch { + // Fall back to main session routing if announce session resolution fails. + } + return params.fallbackSessionKey; +} + +export type SuccessfulDeliveryTarget = Extract; + +type DispatchCronDeliveryParams = { + cfg: OpenClawConfig; + cfgWithAgentDefaults: OpenClawConfig; + deps: CliDeps; + job: CronJob; + agentId: string; + agentSessionKey: string; + runSessionId: string; + runStartedAt: number; + runEndedAt: number; + timeoutMs: number; + resolvedDelivery: DeliveryTargetResolution; + deliveryRequested: boolean; + skipHeartbeatDelivery: boolean; + skipMessagingToolDelivery: boolean; + deliveryBestEffort: boolean; + deliveryPayloadHasStructuredContent: boolean; + deliveryPayloads: ReplyPayload[]; + synthesizedText?: string; + summary?: string; + outputText?: string; + telemetry?: CronRunTelemetry; + abortSignal?: AbortSignal; + isAborted: () => boolean; + abortReason: () => string; + withRunSession: ( + result: Omit, + ) => RunCronAgentTurnResult; +}; + +export type DispatchCronDeliveryState = { + result?: RunCronAgentTurnResult; + delivered: boolean; + summary?: string; + outputText?: string; + synthesizedText?: string; + deliveryPayloads: ReplyPayload[]; +}; + +export async function dispatchCronDelivery( + params: DispatchCronDeliveryParams, +): Promise { + let summary = params.summary; + let outputText = params.outputText; + let synthesizedText = params.synthesizedText; + let deliveryPayloads = params.deliveryPayloads; + + // `true` means we confirmed at least one outbound send reached the target. + // Keep this strict so timer fallback can safely decide whether to wake main. + let delivered = params.skipMessagingToolDelivery; + const failDeliveryTarget = (error: string) => + params.withRunSession({ + status: "error", + error, + errorKind: "delivery-target", + summary, + outputText, + ...params.telemetry, + }); + + const deliverViaDirect = async ( + delivery: SuccessfulDeliveryTarget, + ): Promise => { + const identity = resolveAgentOutboundIdentity(params.cfgWithAgentDefaults, params.agentId); + try { + const payloadsForDelivery = + deliveryPayloads.length > 0 + ? deliveryPayloads + : synthesizedText + ? [{ text: synthesizedText }] + : []; + if (payloadsForDelivery.length === 0) { + return null; + } + if (params.isAborted()) { + return params.withRunSession({ + status: "error", + error: params.abortReason(), + ...params.telemetry, + }); + } + const deliveryResults = await deliverOutboundPayloads({ + cfg: params.cfgWithAgentDefaults, + channel: delivery.channel, + to: delivery.to, + accountId: delivery.accountId, + threadId: delivery.threadId, + payloads: payloadsForDelivery, + agentId: params.agentId, + identity, + bestEffort: params.deliveryBestEffort, + deps: createOutboundSendDeps(params.deps), + abortSignal: params.abortSignal, + }); + delivered = deliveryResults.length > 0; + return null; + } catch (err) { + if (!params.deliveryBestEffort) { + return params.withRunSession({ + status: "error", + summary, + outputText, + error: String(err), + ...params.telemetry, + }); + } + return null; + } + }; + + const deliverViaAnnounce = async ( + delivery: SuccessfulDeliveryTarget, + ): Promise => { + if (!synthesizedText) { + return null; + } + const announceMainSessionKey = resolveAgentMainSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + }); + const announceSessionKey = await resolveCronAnnounceSessionKey({ + cfg: params.cfgWithAgentDefaults, + agentId: params.agentId, + fallbackSessionKey: announceMainSessionKey, + delivery: { + channel: delivery.channel, + to: delivery.to, + accountId: delivery.accountId, + threadId: delivery.threadId, + }, + }); + const taskLabel = + typeof params.job.name === "string" && params.job.name.trim() + ? params.job.name.trim() + : `cron:${params.job.id}`; + const initialSynthesizedText = synthesizedText.trim(); + let activeSubagentRuns = countActiveDescendantRuns(params.agentSessionKey); + const expectedSubagentFollowup = expectsSubagentFollowup(initialSynthesizedText); + const hadActiveDescendants = activeSubagentRuns > 0; + if (activeSubagentRuns > 0 || expectedSubagentFollowup) { + let finalReply = await waitForDescendantSubagentSummary({ + sessionKey: params.agentSessionKey, + initialReply: initialSynthesizedText, + timeoutMs: params.timeoutMs, + observedActiveDescendants: activeSubagentRuns > 0 || expectedSubagentFollowup, + }); + activeSubagentRuns = countActiveDescendantRuns(params.agentSessionKey); + if ( + !finalReply && + activeSubagentRuns === 0 && + (hadActiveDescendants || expectedSubagentFollowup) + ) { + finalReply = await readDescendantSubagentFallbackReply({ + sessionKey: params.agentSessionKey, + runStartedAt: params.runStartedAt, + }); + } + if (finalReply && activeSubagentRuns === 0) { + outputText = finalReply; + summary = pickSummaryFromOutput(finalReply) ?? summary; + synthesizedText = finalReply; + deliveryPayloads = [{ text: finalReply }]; + } + } + if (activeSubagentRuns > 0) { + // Parent orchestration is still in progress; avoid announcing a partial + // update to the main requester. + return params.withRunSession({ status: "ok", summary, outputText, ...params.telemetry }); + } + if ( + (hadActiveDescendants || expectedSubagentFollowup) && + synthesizedText.trim() === initialSynthesizedText && + isLikelyInterimCronMessage(initialSynthesizedText) && + initialSynthesizedText.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase() + ) { + // Descendants existed but no post-orchestration synthesis arrived, so + // suppress stale parent text like "on it, pulling everything together". + return params.withRunSession({ status: "ok", summary, outputText, ...params.telemetry }); + } + if (synthesizedText.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) { + return params.withRunSession({ + status: "ok", + summary, + outputText, + delivered: true, + ...params.telemetry, + }); + } + try { + if (params.isAborted()) { + return params.withRunSession({ + status: "error", + error: params.abortReason(), + ...params.telemetry, + }); + } + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: params.agentSessionKey, + childRunId: `${params.job.id}:${params.runSessionId}:${params.runStartedAt}`, + requesterSessionKey: announceSessionKey, + requesterOrigin: { + channel: delivery.channel, + to: delivery.to, + accountId: delivery.accountId, + threadId: delivery.threadId, + }, + requesterDisplayKey: announceSessionKey, + task: taskLabel, + timeoutMs: params.timeoutMs, + cleanup: params.job.deleteAfterRun ? "delete" : "keep", + roundOneReply: synthesizedText, + // Keep delivery outcome truthful for cron state: if outbound send fails, + // announce flow must report false so caller can apply best-effort policy. + bestEffortDeliver: false, + waitForCompletion: false, + startedAt: params.runStartedAt, + endedAt: params.runEndedAt, + outcome: { status: "ok" }, + announceType: "cron job", + signal: params.abortSignal, + }); + if (didAnnounce) { + delivered = true; + } else { + const message = "cron announce delivery failed"; + if (!params.deliveryBestEffort) { + return params.withRunSession({ + status: "error", + summary, + outputText, + error: message, + ...params.telemetry, + }); + } + logWarn(`[cron:${params.job.id}] ${message}`); + } + } catch (err) { + if (!params.deliveryBestEffort) { + return params.withRunSession({ + status: "error", + summary, + outputText, + error: String(err), + ...params.telemetry, + }); + } + logWarn(`[cron:${params.job.id}] ${String(err)}`); + } + return null; + }; + + if ( + params.deliveryRequested && + !params.skipHeartbeatDelivery && + !params.skipMessagingToolDelivery + ) { + if (!params.resolvedDelivery.ok) { + if (!params.deliveryBestEffort) { + return { + result: failDeliveryTarget(params.resolvedDelivery.error.message), + delivered, + summary, + outputText, + synthesizedText, + deliveryPayloads, + }; + } + logWarn(`[cron:${params.job.id}] ${params.resolvedDelivery.error.message}`); + return { + result: params.withRunSession({ + status: "ok", + summary, + outputText, + ...params.telemetry, + }), + delivered, + summary, + outputText, + synthesizedText, + deliveryPayloads, + }; + } + + // Route text-only cron announce output back through the main session so it + // follows the same system-message injection path as subagent completions. + // Keep direct outbound delivery only for structured payloads (media/channel + // data), which cannot be represented by the shared announce flow. + // + // Forum/topic targets should also use direct delivery. Announce flow can + // be swallowed by ANNOUNCE_SKIP/NO_REPLY in the target agent turn, which + // silently drops cron output for topic-bound sessions. + const useDirectDelivery = + params.deliveryPayloadHasStructuredContent || params.resolvedDelivery.threadId != null; + if (useDirectDelivery) { + const directResult = await deliverViaDirect(params.resolvedDelivery); + if (directResult) { + return { + result: directResult, + delivered, + summary, + outputText, + synthesizedText, + deliveryPayloads, + }; + } + } else { + const announceResult = await deliverViaAnnounce(params.resolvedDelivery); + if (announceResult) { + return { + result: announceResult, + delivered, + summary, + outputText, + synthesizedText, + deliveryPayloads, + }; + } + } + } + + return { + delivered, + summary, + outputText, + synthesizedText, + deliveryPayloads, + }; +} diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 01dae6ce2..ea6c819e2 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -21,10 +21,7 @@ import { resolveHooksGmailModel, resolveThinkingDefault, } from "../../agents/model-selection.js"; -import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.js"; import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; -import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js"; -import { countActiveDescendantRuns } from "../../agents/subagent-registry.js"; import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; import { deriveSessionTotalTokens, hasNonzeroUsage } from "../../agents/usage.js"; import { ensureAgentWorkspace } from "../../agents/workspace.js"; @@ -33,19 +30,11 @@ import { normalizeVerboseLevel, supportsXHighThinking, } from "../../auto-reply/thinking.js"; -import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; -import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js"; +import type { CliDeps } from "../../cli/outbound-send-deps.js"; import type { OpenClawConfig } from "../../config/config.js"; -import { - resolveAgentMainSessionKey, - resolveSessionTranscriptPath, - updateSessionStore, -} from "../../config/sessions.js"; +import { resolveSessionTranscriptPath, updateSessionStore } from "../../config/sessions.js"; import type { AgentDefaultsConfig } from "../../config/types.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; -import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; -import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js"; -import { resolveOutboundSessionRoute } from "../../infra/outbound/outbound-session.js"; import { logWarn } from "../../logger.js"; import { buildAgentMainSessionKey, normalizeAgentId } from "../../routing/session-key.js"; import { @@ -56,6 +45,11 @@ import { } from "../../security/external-content.js"; import { resolveCronDeliveryPlan } from "../delivery.js"; import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js"; +import { + dispatchCronDelivery, + matchesMessagingToolDeliveryTarget, + resolveCronDeliveryBestEffort, +} from "./delivery-dispatch.js"; import { resolveDeliveryTarget } from "./delivery-target.js"; import { isHeartbeatOnlyResponse, @@ -67,74 +61,6 @@ import { } from "./helpers.js"; import { resolveCronSession } from "./session.js"; import { resolveCronSkillsSnapshot } from "./skills-snapshot.js"; -import { - expectsSubagentFollowup, - isLikelyInterimCronMessage, - readDescendantSubagentFallbackReply, - waitForDescendantSubagentSummary, -} from "./subagent-followup.js"; - -function matchesMessagingToolDeliveryTarget( - target: MessagingToolSend, - delivery: { channel?: string; to?: string; accountId?: string }, -): boolean { - if (!delivery.channel || !delivery.to || !target.to) { - return false; - } - const channel = delivery.channel.trim().toLowerCase(); - const provider = target.provider?.trim().toLowerCase(); - if (provider && provider !== "message" && provider !== channel) { - return false; - } - if (target.accountId && delivery.accountId && target.accountId !== delivery.accountId) { - return false; - } - return target.to === delivery.to; -} - -function resolveCronDeliveryBestEffort(job: CronJob): boolean { - if (typeof job.delivery?.bestEffort === "boolean") { - return job.delivery.bestEffort; - } - if (job.payload.kind === "agentTurn" && typeof job.payload.bestEffortDeliver === "boolean") { - return job.payload.bestEffortDeliver; - } - return false; -} - -async function resolveCronAnnounceSessionKey(params: { - cfg: OpenClawConfig; - agentId: string; - fallbackSessionKey: string; - delivery: { - channel: Parameters[0]["channel"]; - to?: string; - accountId?: string; - threadId?: string | number; - }; -}): Promise { - const to = params.delivery.to?.trim(); - if (!to) { - return params.fallbackSessionKey; - } - try { - const route = await resolveOutboundSessionRoute({ - cfg: params.cfg, - channel: params.delivery.channel, - agentId: params.agentId, - accountId: params.delivery.accountId, - target: to, - threadId: params.delivery.threadId, - }); - const resolved = route?.sessionKey?.trim(); - if (resolved) { - return resolved; - } - } catch { - // Fall back to main session routing if announce session resolution fails. - } - return params.fallbackSessionKey; -} export type RunCronAgentTurnResult = { /** Last non-empty agent text output (not truncated). */ @@ -632,228 +558,39 @@ export async function runCronIsolatedAgentTurn(params: { }), ); - // `true` means we confirmed at least one outbound send reached the target. - // Keep this strict so timer fallback can safely decide whether to wake main. - let delivered = skipMessagingToolDelivery; - type SuccessfulDeliveryTarget = Extract< - Awaited>, - { ok: true } - >; - const failDeliveryTarget = (error: string) => - withRunSession({ - status: "error", - error, - errorKind: "delivery-target", - summary, - outputText, - ...telemetry, - }); - const deliverViaDirect = async ( - delivery: SuccessfulDeliveryTarget, - ): Promise => { - const identity = resolveAgentOutboundIdentity(cfgWithAgentDefaults, agentId); - try { - const payloadsForDelivery = - deliveryPayloads.length > 0 - ? deliveryPayloads - : synthesizedText - ? [{ text: synthesizedText }] - : []; - if (payloadsForDelivery.length === 0) { - return null; - } - if (isAborted()) { - return withRunSession({ status: "error", error: abortReason(), ...telemetry }); - } - const deliveryResults = await deliverOutboundPayloads({ - cfg: cfgWithAgentDefaults, - channel: delivery.channel, - to: delivery.to, - accountId: delivery.accountId, - threadId: delivery.threadId, - payloads: payloadsForDelivery, - agentId, - identity, - bestEffort: deliveryBestEffort, - deps: createOutboundSendDeps(params.deps), - abortSignal, - }); - delivered = deliveryResults.length > 0; - return null; - } catch (err) { - if (!deliveryBestEffort) { - return withRunSession({ - status: "error", - summary, - outputText, - error: String(err), - ...telemetry, - }); - } - return null; - } - }; - const deliverViaAnnounce = async ( - delivery: SuccessfulDeliveryTarget, - ): Promise => { - if (!synthesizedText) { - return null; - } - const announceMainSessionKey = resolveAgentMainSessionKey({ - cfg: params.cfg, - agentId, - }); - const announceSessionKey = await resolveCronAnnounceSessionKey({ - cfg: cfgWithAgentDefaults, - agentId, - fallbackSessionKey: announceMainSessionKey, - delivery: { - channel: delivery.channel, - to: delivery.to, - accountId: delivery.accountId, - threadId: delivery.threadId, - }, - }); - const taskLabel = - typeof params.job.name === "string" && params.job.name.trim() - ? params.job.name.trim() - : `cron:${params.job.id}`; - const initialSynthesizedText = synthesizedText.trim(); - let activeSubagentRuns = countActiveDescendantRuns(agentSessionKey); - const expectedSubagentFollowup = expectsSubagentFollowup(initialSynthesizedText); - const hadActiveDescendants = activeSubagentRuns > 0; - if (activeSubagentRuns > 0 || expectedSubagentFollowup) { - let finalReply = await waitForDescendantSubagentSummary({ - sessionKey: agentSessionKey, - initialReply: initialSynthesizedText, - timeoutMs, - observedActiveDescendants: activeSubagentRuns > 0 || expectedSubagentFollowup, - }); - activeSubagentRuns = countActiveDescendantRuns(agentSessionKey); - if ( - !finalReply && - activeSubagentRuns === 0 && - (hadActiveDescendants || expectedSubagentFollowup) - ) { - finalReply = await readDescendantSubagentFallbackReply({ - sessionKey: agentSessionKey, - runStartedAt, - }); - } - if (finalReply && activeSubagentRuns === 0) { - outputText = finalReply; - summary = pickSummaryFromOutput(finalReply) ?? summary; - synthesizedText = finalReply; - deliveryPayloads = [{ text: finalReply }]; - } - } - if (activeSubagentRuns > 0) { - // Parent orchestration is still in progress; avoid announcing a partial - // update to the main requester. - return withRunSession({ status: "ok", summary, outputText, ...telemetry }); - } - if ( - (hadActiveDescendants || expectedSubagentFollowup) && - synthesizedText.trim() === initialSynthesizedText && - isLikelyInterimCronMessage(initialSynthesizedText) && - initialSynthesizedText.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase() - ) { - // Descendants existed but no post-orchestration synthesis arrived, so - // suppress stale parent text like "on it, pulling everything together". - return withRunSession({ status: "ok", summary, outputText, ...telemetry }); - } - if (synthesizedText.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) { - return withRunSession({ status: "ok", summary, outputText, delivered: true, ...telemetry }); - } - try { - if (isAborted()) { - return withRunSession({ status: "error", error: abortReason(), ...telemetry }); - } - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: agentSessionKey, - childRunId: `${params.job.id}:${runSessionId}:${runStartedAt}`, - requesterSessionKey: announceSessionKey, - requesterOrigin: { - channel: delivery.channel, - to: delivery.to, - accountId: delivery.accountId, - threadId: delivery.threadId, - }, - requesterDisplayKey: announceSessionKey, - task: taskLabel, - timeoutMs, - cleanup: params.job.deleteAfterRun ? "delete" : "keep", - roundOneReply: synthesizedText, - // Keep delivery outcome truthful for cron state: if outbound send fails, - // announce flow must report false so caller can apply best-effort policy. - bestEffortDeliver: false, - waitForCompletion: false, - startedAt: runStartedAt, - endedAt: runEndedAt, - outcome: { status: "ok" }, - announceType: "cron job", - signal: abortSignal, - }); - if (didAnnounce) { - delivered = true; - } else { - const message = "cron announce delivery failed"; - if (!deliveryBestEffort) { - return withRunSession({ - status: "error", - summary, - outputText, - error: message, - ...telemetry, - }); - } - logWarn(`[cron:${params.job.id}] ${message}`); - } - } catch (err) { - if (!deliveryBestEffort) { - return withRunSession({ - status: "error", - summary, - outputText, - error: String(err), - ...telemetry, - }); - } - logWarn(`[cron:${params.job.id}] ${String(err)}`); - } - return null; - }; - if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) { - if (!resolvedDelivery.ok) { - if (!deliveryBestEffort) { - return failDeliveryTarget(resolvedDelivery.error.message); - } - logWarn(`[cron:${params.job.id}] ${resolvedDelivery.error.message}`); - return withRunSession({ status: "ok", summary, outputText, ...telemetry }); - } - - // Route text-only cron announce output back through the main session so it - // follows the same system-message injection path as subagent completions. - // Keep direct outbound delivery only for structured payloads (media/channel - // data), which cannot be represented by the shared announce flow. - // - // Forum/topic targets should also use direct delivery. Announce flow can - // be swallowed by ANNOUNCE_SKIP/NO_REPLY in the target agent turn, which - // silently drops cron output for topic-bound sessions. - const useDirectDelivery = - deliveryPayloadHasStructuredContent || resolvedDelivery.threadId != null; - if (useDirectDelivery) { - const directResult = await deliverViaDirect(resolvedDelivery); - if (directResult) { - return directResult; - } - } else { - const announceResult = await deliverViaAnnounce(resolvedDelivery); - if (announceResult) { - return announceResult; - } - } + const deliveryResult = await dispatchCronDelivery({ + cfg: params.cfg, + cfgWithAgentDefaults, + deps: params.deps, + job: params.job, + agentId, + agentSessionKey, + runSessionId, + runStartedAt, + runEndedAt, + timeoutMs, + resolvedDelivery, + deliveryRequested, + skipHeartbeatDelivery, + skipMessagingToolDelivery, + deliveryBestEffort, + deliveryPayloadHasStructuredContent, + deliveryPayloads, + synthesizedText, + summary, + outputText, + telemetry, + abortSignal, + isAborted, + abortReason, + withRunSession, + }); + if (deliveryResult.result) { + return deliveryResult.result; } + const delivered = deliveryResult.delivered; + summary = deliveryResult.summary; + outputText = deliveryResult.outputText; return withRunSession({ status: "ok", summary, outputText, delivered, ...telemetry }); }