diff --git a/.gitignore b/.gitignore index a0eb56c86..f9f3bc99b 100644 --- a/.gitignore +++ b/.gitignore @@ -75,3 +75,4 @@ USER.md memory/ .agent/*.json !.agent/workflows/ +local/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 0dddd50f1..430abee75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Cron: route text-only isolated agent announces through the shared subagent announce flow; add exponential backoff for repeated errors; preserve future `nextRunAtMs` on restart; include current-boundary schedule matches; prevent stale threadId reuse across targets; and add per-job execution timeout. (#11641) Thanks @tyler6204. - Agents: recover from context overflow caused by oversized tool results (pre-emptive capping + fallback truncation). (#11579) Thanks @tyler6204. - Gateway/CLI: when `gateway.bind=lan`, use a LAN IP for probe URLs and Control UI links. (#11448) Thanks @AnonO6. - Memory: set Voyage embeddings `input_type` for improved retrieval. (#10818) Thanks @mcinteerj. diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index 25661ae7f..b1e5ef9a1 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -464,6 +464,13 @@ openclaw system event --mode now --text "Next heartbeat: check battery." - Check the Gateway is running continuously (cron runs inside the Gateway process). - For `cron` schedules: confirm timezone (`--tz`) vs the host timezone. +### A recurring job keeps delaying after failures + +- OpenClaw applies exponential retry backoff for recurring jobs after consecutive errors: + 30s, 1m, 5m, 15m, then 60m between retries. +- Backoff resets automatically after the next successful run. +- One-shot (`at`) jobs disable after a terminal run (`ok`, `error`, or `skipped`) and do not retry. + ### Telegram delivers to the wrong place - For forum topics, use `-100…:topic:` so it’s explicit and unambiguous. diff --git a/docs/cli/cron.md b/docs/cli/cron.md index c28da2638..3e56db971 100644 --- a/docs/cli/cron.md +++ b/docs/cli/cron.md @@ -21,6 +21,8 @@ output internal. `--deliver` remains as a deprecated alias for `--announce`. Note: one-shot (`--at`) jobs delete after success by default. Use `--keep-after-run` to keep them. +Note: recurring jobs now use exponential retry backoff after consecutive errors (30s → 1m → 5m → 15m → 60m), then return to normal schedule after the next successful run. + ## Common edits Update delivery settings without changing the message: diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn-normalizes-allowlisted-agent-ids.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn-normalizes-allowlisted-agent-ids.test.ts index 00d622dec..d2f7a05be 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn-normalizes-allowlisted-agent-ids.test.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn-normalizes-allowlisted-agent-ids.test.ts @@ -245,7 +245,7 @@ describe("openclaw-tools: subagents", () => { | undefined; expect(second?.sessionKey).toBe("discord:group:req"); expect(second?.deliver).toBe(true); - expect(second?.message).toContain("background task"); + expect(second?.message).toContain("subagent task"); const sendCalls = calls.filter((c) => c.method === "send"); expect(sendCalls.length).toBe(0); diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn-resolves-main-announce-target-from.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn-resolves-main-announce-target-from.test.ts index 30c32aff1..9f7799848 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn-resolves-main-announce-target-from.test.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn-resolves-main-announce-target-from.test.ts @@ -153,7 +153,7 @@ describe("openclaw-tools: subagents", () => { // Second call: main agent trigger (not "Sub-agent announce step." anymore) const second = agentCalls[1]?.params as { sessionKey?: string; message?: string } | undefined; expect(second?.sessionKey).toBe("main"); - expect(second?.message).toContain("background task"); + expect(second?.message).toContain("subagent task"); // No direct send to external channel (main agent handles delivery) const sendCalls = calls.filter((c) => c.method === "send"); diff --git a/src/agents/subagent-announce.format.test.ts b/src/agents/subagent-announce.format.test.ts index e00aae60a..2f137dc4d 100644 --- a/src/agents/subagent-announce.format.test.ts +++ b/src/agents/subagent-announce.format.test.ts @@ -94,7 +94,7 @@ describe("subagent announce formatting", () => { }; const msg = call?.params?.message as string; expect(call?.params?.sessionKey).toBe("agent:main:main"); - expect(msg).toContain("background task"); + expect(msg).toContain("subagent task"); expect(msg).toContain("failed"); expect(msg).toContain("boom"); expect(msg).toContain("Findings:"); @@ -155,7 +155,7 @@ describe("subagent announce formatting", () => { expect(didAnnounce).toBe(true); expect(embeddedRunMock.queueEmbeddedPiMessage).toHaveBeenCalledWith( "session-123", - expect.stringContaining("background task"), + expect.stringContaining("subagent task"), ); expect(agentSpy).not.toHaveBeenCalled(); }); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index b83a543bf..24de2c2bc 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -345,6 +345,8 @@ export type SubagentRunOutcome = { error?: string; }; +export type SubagentAnnounceType = "subagent task" | "cron job"; + export async function runSubagentAnnounceFlow(params: { childSessionKey: string; childRunId: string; @@ -360,6 +362,7 @@ export async function runSubagentAnnounceFlow(params: { endedAt?: number; label?: string; outcome?: SubagentRunOutcome; + announceType?: SubagentAnnounceType; }): Promise { let didAnnounce = false; try { @@ -433,9 +436,10 @@ export async function runSubagentAnnounceFlow(params: { : "finished with unknown status"; // Build instructional message for main agent - const taskLabel = params.label || params.task || "background task"; + const announceType = params.announceType ?? "subagent task"; + const taskLabel = params.label || params.task || "task"; const triggerMessage = [ - `A background task "${taskLabel}" just ${statusLabel}.`, + `A ${announceType} "${taskLabel}" just ${statusLabel}.`, "", "Findings:", reply || "(no output)", @@ -443,7 +447,7 @@ export async function runSubagentAnnounceFlow(params: { statsLine, "", "Summarize this naturally for the user. Keep it brief (1-2 sentences). Flow it into the conversation naturally.", - "Do not mention technical details like tokens, stats, or that this was a background task.", + `Do not mention technical details like tokens, stats, or that this was a ${announceType}.`, "You can respond with NO_REPLY if no announcement is needed (e.g., internal task with no user-facing result).", ].join("\n"); diff --git a/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts b/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts index 5d3a7caf2..674763f8e 100644 --- a/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts +++ b/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts @@ -17,9 +17,13 @@ vi.mock("../agents/pi-embedded.js", () => ({ vi.mock("../agents/model-catalog.js", () => ({ loadModelCatalog: vi.fn(), })); +vi.mock("../agents/subagent-announce.js", () => ({ + runSubagentAnnounceFlow: vi.fn(), +})); import { loadModelCatalog } from "../agents/model-catalog.js"; import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; +import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js"; import { runCronIsolatedAgentTurn } from "./isolated-agent.js"; async function withTempHome(fn: (home: string) => Promise): Promise { @@ -86,6 +90,7 @@ describe("runCronIsolatedAgentTurn", () => { beforeEach(() => { vi.mocked(runEmbeddedPiAgent).mockReset(); vi.mocked(loadModelCatalog).mockResolvedValue([]); + vi.mocked(runSubagentAnnounceFlow).mockReset().mockResolvedValue(true); setActivePluginRegistry( createTestRegistry([ { @@ -136,10 +141,11 @@ describe("runCronIsolatedAgentTurn", () => { expect(res.status).toBe("ok"); expect(deps.sendMessageTelegram).toHaveBeenCalled(); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); }); }); - it("delivers when heartbeat ack padding exceeds configured limit", async () => { + it("uses shared announce flow when heartbeat ack padding exceeds configured limit", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home); const deps: CliDeps = { @@ -185,7 +191,8 @@ describe("runCronIsolatedAgentTurn", () => { }); expect(res.status).toBe("ok"); - expect(deps.sendMessageTelegram).toHaveBeenCalled(); + expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); + expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); }); }); }); diff --git a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts index 4b5317ef4..4b0d04d18 100644 --- a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts +++ b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts @@ -17,9 +17,13 @@ vi.mock("../agents/pi-embedded.js", () => ({ vi.mock("../agents/model-catalog.js", () => ({ loadModelCatalog: vi.fn(), })); +vi.mock("../agents/subagent-announce.js", () => ({ + runSubagentAnnounceFlow: vi.fn(), +})); import { loadModelCatalog } from "../agents/model-catalog.js"; import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; +import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js"; import { runCronIsolatedAgentTurn } from "./isolated-agent.js"; async function withTempHome(fn: (home: string) => Promise): Promise { @@ -86,6 +90,7 @@ describe("runCronIsolatedAgentTurn", () => { beforeEach(() => { vi.mocked(runEmbeddedPiAgent).mockReset(); vi.mocked(loadModelCatalog).mockResolvedValue([]); + vi.mocked(runSubagentAnnounceFlow).mockReset().mockResolvedValue(true); setActivePluginRegistry( createTestRegistry([ { @@ -97,7 +102,7 @@ describe("runCronIsolatedAgentTurn", () => { ); }); - it("announces when delivery is requested", async () => { + it("announces via shared subagent flow when delivery is requested", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home); const deps: CliDeps = { @@ -130,11 +135,16 @@ describe("runCronIsolatedAgentTurn", () => { }); expect(res.status).toBe("ok"); - expect(deps.sendMessageTelegram).toHaveBeenCalled(); + expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); + const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as + | { announceType?: string } + | undefined; + expect(announceArgs?.announceType).toBe("cron job"); + expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); }); }); - it("announces only the final payload text", async () => { + it("passes final payload text into shared subagent announce flow", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home); const deps: CliDeps = { @@ -167,12 +177,71 @@ describe("runCronIsolatedAgentTurn", () => { }); expect(res.status).toBe("ok"); - expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); - expect(deps.sendMessageTelegram).toHaveBeenCalledWith( - "123", - "Final weather summary", - expect.any(Object), + expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); + const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as + | { roundOneReply?: string; requesterOrigin?: { threadId?: string | number } } + | undefined; + expect(announceArgs?.roundOneReply).toBe("Final weather summary"); + expect(announceArgs?.requesterOrigin?.threadId).toBeUndefined(); + }); + }); + + it("passes resolved threadId into shared subagent announce flow", async () => { + await withTempHome(async (home) => { + const storePath = await writeSessionStore(home); + await fs.writeFile( + storePath, + JSON.stringify( + { + "agent:main:main": { + sessionId: "main-session", + updatedAt: Date.now(), + lastChannel: "telegram", + lastTo: "123", + lastThreadId: 42, + }, + }, + null, + 2, + ), + "utf-8", ); + const deps: CliDeps = { + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "Final weather summary" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath, { + channels: { telegram: { botToken: "t-1" } }, + }), + deps, + job: { + ...makeJob({ kind: "agentTurn", message: "do it" }), + delivery: { mode: "announce", channel: "last" }, + }, + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); + + expect(res.status).toBe("ok"); + const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as + | { requesterOrigin?: { threadId?: string | number; channel?: string; to?: string } } + | undefined; + expect(announceArgs?.requesterOrigin?.channel).toBe("telegram"); + expect(announceArgs?.requesterOrigin?.to).toBe("123"); + expect(announceArgs?.requesterOrigin?.threadId).toBe(42); }); }); @@ -211,6 +280,7 @@ describe("runCronIsolatedAgentTurn", () => { }); expect(res.status).toBe("ok"); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); }); }); @@ -248,11 +318,12 @@ describe("runCronIsolatedAgentTurn", () => { }); expect(res.status).toBe("ok"); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); }); }); - it("fails when announce delivery fails and best-effort is disabled", async () => { + it("fails when shared announce flow fails and best-effort is disabled", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home); const deps: CliDeps = { @@ -269,6 +340,7 @@ describe("runCronIsolatedAgentTurn", () => { agentMeta: { sessionId: "s", provider: "p", model: "m" }, }, }); + vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false); const res = await runCronIsolatedAgentTurn({ cfg: makeCfg(home, storePath, { channels: { telegram: { botToken: "t-1" } }, @@ -284,11 +356,11 @@ describe("runCronIsolatedAgentTurn", () => { }); expect(res.status).toBe("error"); - expect(res.error).toBe("Error: boom"); + expect(res.error).toBe("cron announce delivery failed"); }); }); - it("ignores announce delivery failures when best-effort is enabled", async () => { + it("ignores shared announce flow failures when best-effort is enabled", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home); const deps: CliDeps = { @@ -305,6 +377,7 @@ describe("runCronIsolatedAgentTurn", () => { agentMeta: { sessionId: "s", provider: "p", model: "m" }, }, }); + vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false); const res = await runCronIsolatedAgentTurn({ cfg: makeCfg(home, storePath, { channels: { telegram: { botToken: "t-1" } }, @@ -325,7 +398,8 @@ describe("runCronIsolatedAgentTurn", () => { }); expect(res.status).toBe("ok"); - expect(deps.sendMessageTelegram).toHaveBeenCalled(); + expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); + expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); }); }); }); diff --git a/src/cron/isolated-agent/delivery-target.ts b/src/cron/isolated-agent/delivery-target.ts index 35ccc9047..4c6fe7681 100644 --- a/src/cron/isolated-agent/delivery-target.ts +++ b/src/cron/isolated-agent/delivery-target.ts @@ -70,12 +70,21 @@ export async function resolveDeliveryTarget( const mode = resolved.mode as "explicit" | "implicit"; const toCandidate = resolved.to; + // Only carry threadId when delivering to the same recipient as the session's + // last conversation. This prevents stale thread IDs (e.g. from a Telegram + // supergroup topic) from being sent to a different target (e.g. a private + // chat) where they would cause API errors. + const threadId = + resolved.threadId && resolved.to && resolved.to === resolved.lastTo + ? resolved.threadId + : undefined; + if (!toCandidate) { return { channel, to: undefined, accountId: resolved.accountId, - threadId: resolved.threadId, + threadId, mode, }; } @@ -91,7 +100,7 @@ export async function resolveDeliveryTarget( channel, to: docked.ok ? docked.to : undefined, accountId: resolved.accountId, - threadId: resolved.threadId, + threadId, mode, error: docked.ok ? undefined : docked.error, }; diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 3dd0cc416..2ae856b37 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -31,6 +31,7 @@ import { import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js"; import { getSkillsSnapshotVersion } from "../../agents/skills/refresh.js"; +import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js"; import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; import { hasNonzeroUsage } from "../../agents/usage.js"; import { ensureAgentWorkspace } from "../../agents/workspace.js"; @@ -40,7 +41,11 @@ import { supportsXHighThinking, } from "../../auto-reply/thinking.js"; import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js"; -import { resolveSessionTranscriptPath, updateSessionStore } from "../../config/sessions.js"; +import { + resolveAgentMainSessionKey, + resolveSessionTranscriptPath, + updateSessionStore, +} from "../../config/sessions.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; import { getRemoteSkillEligibility } from "../../infra/skills-remote.js"; @@ -358,6 +363,8 @@ export async function runCronIsolatedAgentTurn(params: { let runResult: Awaited>; let fallbackProvider = provider; let fallbackModel = model; + const runStartedAt = Date.now(); + let runEndedAt = runStartedAt; try { const sessionFile = resolveSessionTranscriptPath(cronSession.sessionEntry.sessionId, agentId); const resolvedVerboseLevel = @@ -420,6 +427,7 @@ export async function runCronIsolatedAgentTurn(params: { runResult = fallbackResult.result; fallbackProvider = fallbackResult.provider; fallbackModel = fallbackResult.model; + runEndedAt = Date.now(); } catch (err) { return withRunSession({ status: "error", error: String(err) }); } @@ -465,6 +473,10 @@ export async function runCronIsolatedAgentTurn(params: { : synthesizedText ? [{ text: synthesizedText }] : []; + const deliveryPayloadHasStructuredContent = + Boolean(deliveryPayload?.mediaUrl) || + (deliveryPayload?.mediaUrls?.length ?? 0) > 0 || + Object.keys(deliveryPayload?.channelData ?? {}).length > 0; const deliveryBestEffort = resolveCronDeliveryBestEffort(params.job); // Skip delivery for heartbeat-only responses (HEARTBEAT_OK with no real content). @@ -507,20 +519,73 @@ export async function runCronIsolatedAgentTurn(params: { logWarn(`[cron:${params.job.id}] ${message}`); return withRunSession({ status: "ok", summary, outputText }); } - try { - await deliverOutboundPayloads({ - cfg: cfgWithAgentDefaults, - channel: resolvedDelivery.channel, - to: resolvedDelivery.to, - accountId: resolvedDelivery.accountId, - threadId: resolvedDelivery.threadId, - payloads: deliveryPayloads, - bestEffort: deliveryBestEffort, - deps: createOutboundSendDeps(params.deps), + // Shared subagent announce flow is text-based; keep direct outbound delivery + // for media/channel payloads so structured content is preserved. + if (deliveryPayloadHasStructuredContent) { + try { + await deliverOutboundPayloads({ + cfg: cfgWithAgentDefaults, + channel: resolvedDelivery.channel, + to: resolvedDelivery.to, + accountId: resolvedDelivery.accountId, + threadId: resolvedDelivery.threadId, + payloads: deliveryPayloads, + bestEffort: deliveryBestEffort, + deps: createOutboundSendDeps(params.deps), + }); + } catch (err) { + if (!deliveryBestEffort) { + return withRunSession({ status: "error", summary, outputText, error: String(err) }); + } + } + } else if (synthesizedText) { + const announceSessionKey = resolveAgentMainSessionKey({ + cfg: params.cfg, + agentId, }); - } catch (err) { - if (!deliveryBestEffort) { - return withRunSession({ status: "error", summary, outputText, error: String(err) }); + const taskLabel = + typeof params.job.name === "string" && params.job.name.trim() + ? params.job.name.trim() + : `cron:${params.job.id}`; + try { + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: runSessionKey, + childRunId: `${params.job.id}:${runSessionId}`, + requesterSessionKey: announceSessionKey, + requesterOrigin: { + channel: resolvedDelivery.channel, + to: resolvedDelivery.to, + accountId: resolvedDelivery.accountId, + threadId: resolvedDelivery.threadId, + }, + requesterDisplayKey: announceSessionKey, + task: taskLabel, + timeoutMs, + cleanup: "keep", + roundOneReply: synthesizedText, + waitForCompletion: false, + startedAt: runStartedAt, + endedAt: runEndedAt, + outcome: { status: "ok" }, + announceType: "cron job", + }); + if (!didAnnounce) { + const message = "cron announce delivery failed"; + if (!deliveryBestEffort) { + return withRunSession({ + status: "error", + summary, + outputText, + error: message, + }); + } + logWarn(`[cron:${params.job.id}] ${message}`); + } + } catch (err) { + if (!deliveryBestEffort) { + return withRunSession({ status: "error", summary, outputText, error: String(err) }); + } + logWarn(`[cron:${params.job.id}] ${String(err)}`); } } } diff --git a/src/cron/schedule.ts b/src/cron/schedule.ts index fc13ebfe2..090926591 100644 --- a/src/cron/schedule.ts +++ b/src/cron/schedule.ts @@ -49,17 +49,13 @@ export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): numbe timezone: resolveCronTimezone(schedule.tz), catch: false, }); - let cursor = nowMs; - for (let attempt = 0; attempt < 3; attempt++) { - const next = cron.nextRun(new Date(cursor)); - if (!next) { - return undefined; - } - const nextMs = next.getTime(); - if (Number.isFinite(nextMs) && nextMs > nowMs) { - return nextMs; - } - cursor += 1_000; + // Use a tiny lookback (1ms) so croner doesn't skip the current second + // boundary. Without this, a job updated at exactly its cron time would + // be scheduled for the *next* matching time (e.g. 24h later for daily). + const next = cron.nextRun(new Date(nowMs - 1)); + if (!next) { + return undefined; } - return undefined; + const nextMs = next.getTime(); + return Number.isFinite(nextMs) && nextMs >= nowMs ? nextMs : undefined; } diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index fbd96d34d..b51cfab8d 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -118,10 +118,17 @@ export function recomputeNextRuns(state: CronServiceState): boolean { job.state.runningAtMs = undefined; changed = true; } - const newNext = computeJobNextRunAtMs(job, now); - if (job.state.nextRunAtMs !== newNext) { - job.state.nextRunAtMs = newNext; - changed = true; + // Only recompute if nextRunAtMs is missing or already past-due. + // Preserving a still-future nextRunAtMs avoids accidentally advancing + // a job that hasn't fired yet (e.g. during restart recovery). + const nextRun = job.state.nextRunAtMs; + const isDueOrMissing = nextRun === undefined || now >= nextRun; + if (isDueOrMissing) { + const newNext = computeJobNextRunAtMs(job, now); + if (job.state.nextRunAtMs !== newNext) { + job.state.nextRunAtMs = newNext; + changed = true; + } } } return changed; @@ -380,6 +387,9 @@ function mergeCronDelivery( } export function isJobDue(job: CronJob, nowMs: number, opts: { forced: boolean }) { + if (!job.state) { + job.state = {}; + } if (typeof job.state.runningAtMs === "number") { return false; } diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 545261e97..982a09e88 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -52,6 +52,12 @@ export function stop(state: CronServiceState) { export async function status(state: CronServiceState) { return await locked(state, async () => { await ensureLoaded(state, { skipRecompute: true }); + if (state.store) { + const changed = recomputeNextRuns(state); + if (changed) { + await persist(state); + } + } return { enabled: state.deps.cronEnabled, storePath: state.deps.storePath, @@ -64,6 +70,12 @@ export async function status(state: CronServiceState) { export async function list(state: CronServiceState, opts?: { includeDisabled?: boolean }) { return await locked(state, async () => { await ensureLoaded(state, { skipRecompute: true }); + if (state.store) { + const changed = recomputeNextRuns(state); + if (changed) { + await persist(state); + } + } const includeDisabled = opts?.includeDisabled === true; const jobs = (state.store?.jobs ?? []).filter((j) => includeDisabled || j.enabled); return jobs.toSorted((a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0)); @@ -76,8 +88,25 @@ export async function add(state: CronServiceState, input: CronJobCreate) { await ensureLoaded(state); const job = createJob(state, input); state.store?.jobs.push(job); + + // Defensive: recompute all next-run times to ensure consistency + recomputeNextRuns(state); + await persist(state); armTimer(state); + + state.deps.log.info( + { + jobId: job.id, + jobName: job.name, + nextRunAtMs: job.state.nextRunAtMs, + schedulerNextWakeAtMs: nextWakeAtMs(state) ?? null, + timerArmed: state.timer !== null, + cronEnabled: state.deps.cronEnabled, + }, + "cron: job added", + ); + emit(state, { jobId: job.id, action: "added", @@ -110,12 +139,17 @@ export async function update(state: CronServiceState, id: string, patch: CronJob }; } } + const scheduleChanged = patch.schedule !== undefined; + const enabledChanged = patch.enabled !== undefined; + job.updatedAtMs = now; - if (job.enabled) { - job.state.nextRunAtMs = computeJobNextRunAtMs(job, now); - } else { - job.state.nextRunAtMs = undefined; - job.state.runningAtMs = undefined; + if (scheduleChanged || enabledChanged) { + if (job.enabled) { + job.state.nextRunAtMs = computeJobNextRunAtMs(job, now); + } else { + job.state.nextRunAtMs = undefined; + job.state.runningAtMs = undefined; + } } await persist(state); diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 8e9bfb2d5..d18deddc6 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -13,19 +13,131 @@ import { ensureLoaded, persist } from "./store.js"; const MAX_TIMER_DELAY_MS = 60_000; +/** + * Maximum wall-clock time for a single job execution. Acts as a safety net + * on top of the per-provider / per-agent timeouts to prevent one stuck job + * from wedging the entire cron lane. + */ +const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes + +/** + * Exponential backoff delays (in ms) indexed by consecutive error count. + * After the last entry the delay stays constant. + */ +const ERROR_BACKOFF_SCHEDULE_MS = [ + 30_000, // 1st error → 30 s + 60_000, // 2nd error → 1 min + 5 * 60_000, // 3rd error → 5 min + 15 * 60_000, // 4th error → 15 min + 60 * 60_000, // 5th+ error → 60 min +]; + +function errorBackoffMs(consecutiveErrors: number): number { + const idx = Math.min(consecutiveErrors - 1, ERROR_BACKOFF_SCHEDULE_MS.length - 1); + return ERROR_BACKOFF_SCHEDULE_MS[Math.max(0, idx)]; +} + +/** + * Apply the result of a job execution to the job's state. + * Handles consecutive error tracking, exponential backoff, one-shot disable, + * and nextRunAtMs computation. Returns `true` if the job should be deleted. + */ +function applyJobResult( + state: CronServiceState, + job: CronJob, + result: { + status: "ok" | "error" | "skipped"; + error?: string; + startedAt: number; + endedAt: number; + }, +): boolean { + job.state.runningAtMs = undefined; + job.state.lastRunAtMs = result.startedAt; + job.state.lastStatus = result.status; + job.state.lastDurationMs = Math.max(0, result.endedAt - result.startedAt); + job.state.lastError = result.error; + job.updatedAtMs = result.endedAt; + + // Track consecutive errors for backoff / auto-disable. + if (result.status === "error") { + job.state.consecutiveErrors = (job.state.consecutiveErrors ?? 0) + 1; + } else { + job.state.consecutiveErrors = 0; + } + + const shouldDelete = + job.schedule.kind === "at" && result.status === "ok" && job.deleteAfterRun === true; + + if (!shouldDelete) { + if (job.schedule.kind === "at") { + // One-shot jobs are always disabled after ANY terminal status + // (ok, error, or skipped). This prevents tight-loop rescheduling + // when computeJobNextRunAtMs returns the past atMs value (#11452). + job.enabled = false; + job.state.nextRunAtMs = undefined; + if (result.status === "error") { + state.deps.log.warn( + { + jobId: job.id, + jobName: job.name, + consecutiveErrors: job.state.consecutiveErrors, + error: result.error, + }, + "cron: disabling one-shot job after error", + ); + } + } else if (result.status === "error" && job.enabled) { + // Apply exponential backoff for errored jobs to prevent retry storms. + const backoff = errorBackoffMs(job.state.consecutiveErrors ?? 1); + const normalNext = computeJobNextRunAtMs(job, result.endedAt); + const backoffNext = result.endedAt + backoff; + // Use whichever is later: the natural next run or the backoff delay. + job.state.nextRunAtMs = + normalNext !== undefined ? Math.max(normalNext, backoffNext) : backoffNext; + state.deps.log.info( + { + jobId: job.id, + consecutiveErrors: job.state.consecutiveErrors, + backoffMs: backoff, + nextRunAtMs: job.state.nextRunAtMs, + }, + "cron: applying error backoff", + ); + } else if (job.enabled) { + job.state.nextRunAtMs = computeJobNextRunAtMs(job, result.endedAt); + } else { + job.state.nextRunAtMs = undefined; + } + } + + return shouldDelete; +} + export function armTimer(state: CronServiceState) { if (state.timer) { clearTimeout(state.timer); } state.timer = null; if (!state.deps.cronEnabled) { + state.deps.log.debug({}, "cron: armTimer skipped - scheduler disabled"); return; } const nextAt = nextWakeAtMs(state); if (!nextAt) { + const jobCount = state.store?.jobs.length ?? 0; + const enabledCount = state.store?.jobs.filter((j) => j.enabled).length ?? 0; + const withNextRun = + state.store?.jobs.filter((j) => j.enabled && typeof j.state.nextRunAtMs === "number") + .length ?? 0; + state.deps.log.debug( + { jobCount, enabledCount, withNextRun }, + "cron: armTimer skipped - no jobs with nextRunAtMs", + ); return; } - const delay = Math.max(nextAt - state.deps.nowMs(), 0); + const now = state.deps.nowMs(); + const delay = Math.max(nextAt - now, 0); // Wake at least once a minute to avoid schedule drift and recover quickly // when the process was paused or wall-clock time jumps. const clampedDelay = Math.min(delay, MAX_TIMER_DELAY_MS); @@ -36,6 +148,10 @@ export function armTimer(state: CronServiceState) { state.deps.log.error({ err: String(err) }, "cron: timer tick failed"); } }, clampedDelay); + state.deps.log.debug( + { nextAt, delayMs: clampedDelay, clamped: delay > MAX_TIMER_DELAY_MS }, + "cron: timer armed", + ); } export async function onTimer(state: CronServiceState) { @@ -84,10 +200,29 @@ export async function onTimer(state: CronServiceState) { const startedAt = state.deps.nowMs(); job.state.runningAtMs = startedAt; emit(state, { jobId: job.id, action: "started", runAtMs: startedAt }); + + const jobTimeoutMs = + job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number" + ? job.payload.timeoutSeconds * 1_000 + : DEFAULT_JOB_TIMEOUT_MS; + try { - const result = await executeJobCore(state, job); + let timeoutId: NodeJS.Timeout; + const result = await Promise.race([ + executeJobCore(state, job), + new Promise((_, reject) => { + timeoutId = setTimeout( + () => reject(new Error("cron: job execution timed out")), + jobTimeoutMs, + ); + }), + ]).finally(() => clearTimeout(timeoutId!)); results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() }); } catch (err) { + state.deps.log.warn( + { jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs }, + `cron: job failed: ${String(err)}`, + ); results.push({ jobId: id, status: "error", @@ -108,26 +243,12 @@ export async function onTimer(state: CronServiceState) { continue; } - const startedAt = result.startedAt; - job.state.runningAtMs = undefined; - job.state.lastRunAtMs = startedAt; - job.state.lastStatus = result.status; - job.state.lastDurationMs = Math.max(0, result.endedAt - startedAt); - job.state.lastError = result.error; - - const shouldDelete = - job.schedule.kind === "at" && result.status === "ok" && job.deleteAfterRun === true; - - if (!shouldDelete) { - if (job.schedule.kind === "at" && result.status === "ok") { - job.enabled = false; - job.state.nextRunAtMs = undefined; - } else if (job.enabled) { - job.state.nextRunAtMs = computeJobNextRunAtMs(job, result.endedAt); - } else { - job.state.nextRunAtMs = undefined; - } - } + const shouldDelete = applyJobResult(state, job, { + status: result.status, + error: result.error, + startedAt: result.startedAt, + endedAt: result.endedAt, + }); emit(state, { jobId: job.id, @@ -137,7 +258,7 @@ export async function onTimer(state: CronServiceState) { summary: result.summary, sessionId: result.sessionId, sessionKey: result.sessionKey, - runAtMs: startedAt, + runAtMs: result.startedAt, durationMs: job.state.lastDurationMs, nextRunAtMs: job.state.nextRunAtMs, }); @@ -146,8 +267,6 @@ export async function onTimer(state: CronServiceState) { state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id); emit(state, { jobId: job.id, action: "removed" }); } - - job.updatedAtMs = result.endedAt; } recomputeNextRuns(state); @@ -166,6 +285,9 @@ function findDueJobs(state: CronServiceState): CronJob[] { } const now = state.deps.nowMs(); return state.store.jobs.filter((j) => { + if (!j.state) { + j.state = {}; + } if (!j.enabled) { return false; } @@ -183,6 +305,9 @@ export async function runMissedJobs(state: CronServiceState) { } const now = state.deps.nowMs(); const missed = state.store.jobs.filter((j) => { + if (!j.state) { + j.state = {}; + } if (!j.enabled) { return false; } @@ -213,6 +338,9 @@ export async function runDueJobs(state: CronServiceState) { } const now = state.deps.nowMs(); const due = state.store.jobs.filter((j) => { + if (!j.state) { + j.state = {}; + } if (!j.enabled) { return false; } @@ -323,76 +451,54 @@ async function executeJobCore( export async function executeJob( state: CronServiceState, job: CronJob, - nowMs: number, - opts: { forced: boolean }, + _nowMs: number, + _opts: { forced: boolean }, ) { + if (!job.state) { + job.state = {}; + } const startedAt = state.deps.nowMs(); job.state.runningAtMs = startedAt; job.state.lastError = undefined; emit(state, { jobId: job.id, action: "started", runAtMs: startedAt }); - let deleted = false; - - const finish = async ( - status: "ok" | "error" | "skipped", - err?: string, - summary?: string, - session?: { sessionId?: string; sessionKey?: string }, - ) => { - const endedAt = state.deps.nowMs(); - job.state.runningAtMs = undefined; - job.state.lastRunAtMs = startedAt; - job.state.lastStatus = status; - job.state.lastDurationMs = Math.max(0, endedAt - startedAt); - job.state.lastError = err; - - const shouldDelete = - job.schedule.kind === "at" && status === "ok" && job.deleteAfterRun === true; - - if (!shouldDelete) { - if (job.schedule.kind === "at" && status === "ok") { - job.enabled = false; - job.state.nextRunAtMs = undefined; - } else if (job.enabled) { - job.state.nextRunAtMs = computeJobNextRunAtMs(job, endedAt); - } else { - job.state.nextRunAtMs = undefined; - } - } - - emit(state, { - jobId: job.id, - action: "finished", - status, - error: err, - summary, - sessionId: session?.sessionId, - sessionKey: session?.sessionKey, - runAtMs: startedAt, - durationMs: job.state.lastDurationMs, - nextRunAtMs: job.state.nextRunAtMs, - }); - - if (shouldDelete && state.store) { - state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id); - deleted = true; - emit(state, { jobId: job.id, action: "removed" }); - } + let coreResult: { + status: "ok" | "error" | "skipped"; + error?: string; + summary?: string; + sessionId?: string; + sessionKey?: string; }; - try { - const result = await executeJobCore(state, job); - await finish(result.status, result.error, result.summary, { - sessionId: result.sessionId, - sessionKey: result.sessionKey, - }); + coreResult = await executeJobCore(state, job); } catch (err) { - await finish("error", String(err)); - } finally { - job.updatedAtMs = nowMs; - if (!opts.forced && job.enabled && !deleted) { - job.state.nextRunAtMs = computeJobNextRunAtMs(job, state.deps.nowMs()); - } + coreResult = { status: "error", error: String(err) }; + } + + const endedAt = state.deps.nowMs(); + const shouldDelete = applyJobResult(state, job, { + status: coreResult.status, + error: coreResult.error, + startedAt, + endedAt, + }); + + emit(state, { + jobId: job.id, + action: "finished", + status: coreResult.status, + error: coreResult.error, + summary: coreResult.summary, + sessionId: coreResult.sessionId, + sessionKey: coreResult.sessionKey, + runAtMs: startedAt, + durationMs: job.state.lastDurationMs, + nextRunAtMs: job.state.nextRunAtMs, + }); + + if (shouldDelete && state.store) { + state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id); + emit(state, { jobId: job.id, action: "removed" }); } } diff --git a/src/cron/types.ts b/src/cron/types.ts index 736d5529e..97cc6f51f 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -59,6 +59,8 @@ export type CronJobState = { lastStatus?: "ok" | "error" | "skipped"; lastError?: string; lastDurationMs?: number; + /** Number of consecutive execution errors (reset on success). Used for backoff. */ + consecutiveErrors?: number; }; export type CronJob = { diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index c8238c50f..345690c83 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -107,6 +107,7 @@ export const CronJobStateSchema = Type.Object( ), lastError: Type.Optional(Type.String()), lastDurationMs: Type.Optional(Type.Integer({ minimum: 0 })), + consecutiveErrors: Type.Optional(Type.Integer({ minimum: 0 })), }, { additionalProperties: false }, ); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index a73c57595..09c8ddd59 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -96,6 +96,13 @@ const EXEC_EVENT_PROMPT = "Please relay the command output to the user in a helpful way. If the command succeeded, share the relevant output. " + "If it failed, explain what went wrong."; +// Prompt used when a scheduled cron job has fired and injected a system event. +// This overrides the standard heartbeat prompt so the model relays the scheduled +// reminder instead of responding with "HEARTBEAT_OK". +const CRON_EVENT_PROMPT = + "A scheduled reminder has been triggered. The reminder message is shown in the system messages above. " + + "Please relay this reminder to the user in a helpful and friendly way."; + function resolveActiveHoursTimezone(cfg: OpenClawConfig, raw?: string): string { const trimmed = raw?.trim(); if (!trimmed || trimmed === "user") { @@ -512,13 +519,19 @@ export async function runHeartbeatOnce(opts: { // Skip heartbeat if HEARTBEAT.md exists but has no actionable content. // This saves API calls/costs when the file is effectively empty (only comments/headers). - // EXCEPTION: Don't skip for exec events - they have pending system events to process. + // EXCEPTION: Don't skip for exec events or cron events - they have pending system events + // to process regardless of HEARTBEAT.md content. const isExecEventReason = opts.reason === "exec-event"; + const isCronEventReason = Boolean(opts.reason?.startsWith("cron:")); const workspaceDir = resolveAgentWorkspaceDir(cfg, agentId); const heartbeatFilePath = path.join(workspaceDir, DEFAULT_HEARTBEAT_FILENAME); try { const heartbeatFileContent = await fs.readFile(heartbeatFilePath, "utf-8"); - if (isHeartbeatContentEffectivelyEmpty(heartbeatFileContent) && !isExecEventReason) { + if ( + isHeartbeatContentEffectivelyEmpty(heartbeatFileContent) && + !isExecEventReason && + !isCronEventReason + ) { emitHeartbeatEvent({ status: "skipped", reason: "empty-heartbeat-file", @@ -561,19 +574,25 @@ export async function runHeartbeatOnce(opts: { accountId: delivery.accountId, }).responsePrefix; - // Check if this is an exec event with pending exec completion system events. + // Check if this is an exec event or cron event with pending system events. // If so, use a specialized prompt that instructs the model to relay the result // instead of the standard heartbeat prompt with "reply HEARTBEAT_OK". const isExecEvent = opts.reason === "exec-event"; - const pendingEvents = isExecEvent ? peekSystemEvents(sessionKey) : []; + const isCronEvent = Boolean(opts.reason?.startsWith("cron:")); + const pendingEvents = isExecEvent || isCronEvent ? peekSystemEvents(sessionKey) : []; const hasExecCompletion = pendingEvents.some((evt) => evt.includes("Exec finished")); + const hasCronEvents = isCronEvent && pendingEvents.length > 0; - const prompt = hasExecCompletion ? EXEC_EVENT_PROMPT : resolveHeartbeatPrompt(cfg, heartbeat); + const prompt = hasExecCompletion + ? EXEC_EVENT_PROMPT + : hasCronEvents + ? CRON_EVENT_PROMPT + : resolveHeartbeatPrompt(cfg, heartbeat); const ctx = { Body: prompt, From: sender, To: sender, - Provider: hasExecCompletion ? "exec-event" : "heartbeat", + Provider: hasExecCompletion ? "exec-event" : hasCronEvents ? "cron-event" : "heartbeat", SessionKey: sessionKey, }; if (!visibility.showAlerts && !visibility.showOk && !visibility.useIndicator) {