From aa4c250eb8b7110cdfe5a97db5a2b43ba6ebab59 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 22 Feb 2026 20:07:34 +0100 Subject: [PATCH] fix(cron): split run and delivery status tracking --- CHANGELOG.md | 1 + src/cron/run-log.test.ts | 6 ++- src/cron/run-log.ts | 15 +++++- .../service.persists-delivered-status.test.ts | 49 +++++++++++++++++-- src/cron/service/ops.ts | 2 + src/cron/service/state.ts | 3 ++ src/cron/service/timer.ts | 25 +++++++++- src/cron/types.ts | 8 +++ src/gateway/protocol/schema/cron.ts | 25 +++++++--- src/gateway/server-cron.ts | 2 + src/gateway/server.cron.test.ts | 5 ++ 11 files changed, 128 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f9d000a42..4b53d52eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ Docs: https://docs.openclaw.ai - Cron/Service: execute manual `cron.run` jobs outside the cron lock (while still persisting started/finished state atomically) so `cron.list` and `cron.status` remain responsive during long forced runs. (#23628) Thanks @dsgraves. - Cron/Timer: keep a watchdog recheck timer armed while `onTimer` is actively executing so the scheduler continues polling even if a due-run tick stalls for an extended period. (#23628) Thanks @dsgraves. - Cron/Run: enforce the same per-job timeout guard for manual `cron.run` executions as timer-driven runs, including abort propagation for isolated agent jobs, so forced runs cannot wedge indefinitely. (#23704) Thanks @tkuehnl. +- Cron/Status: split execution outcome (`lastRunStatus`) from delivery outcome (`lastDeliveryStatus`) in persisted cron state, finished events, and run history so failed/unknown announcement delivery is visible without conflating it with run errors. - Agents/Compaction: restore embedded compaction safeguard/context-pruning extension loading in production by wiring bundled extension factories into the resource loader instead of runtime file-path resolution. (#22349) Thanks @Glucksberg. - Feishu/Media: for inbound video messages that include both `file_key` (video) and `image_key` (thumbnail), prefer `file_key` when downloading media so video attachments are saved instead of silently failing on thumbnail keys. (#23633) - Hooks/Cron: suppress duplicate main-session events for delivered hook turns and mark `SILENT_REPLY_TOKEN` (`NO_REPLY`) early exits as delivered to prevent hook context pollution. (#20678) Thanks @JonathanWorks. diff --git a/src/cron/run-log.test.ts b/src/cron/run-log.test.ts index a2a31970b..288968801 100644 --- a/src/cron/run-log.test.ts +++ b/src/cron/run-log.test.ts @@ -105,7 +105,7 @@ describe("cron run log", () => { }); }); - it("ignores invalid and non-finished lines while preserving delivered flag", async () => { + it("ignores invalid and non-finished lines while preserving delivery fields", async () => { await withRunLogDir("openclaw-cron-log-filter-", async (dir) => { const logPath = path.join(dir, "runs", "job-1.jsonl"); await fs.mkdir(path.dirname(logPath), { recursive: true }); @@ -120,6 +120,8 @@ describe("cron run log", () => { action: "finished", status: "ok", delivered: true, + deliveryStatus: "not-delivered", + deliveryError: "announce failed", }), ].join("\n") + "\n", "utf-8", @@ -129,6 +131,8 @@ describe("cron run log", () => { expect(entries).toHaveLength(1); expect(entries[0]?.ts).toBe(2); expect(entries[0]?.delivered).toBe(true); + expect(entries[0]?.deliveryStatus).toBe("not-delivered"); + expect(entries[0]?.deliveryError).toBe("announce failed"); }); }); diff --git a/src/cron/run-log.ts b/src/cron/run-log.ts index 0a2c74959..0fd6de76e 100644 --- a/src/cron/run-log.ts +++ b/src/cron/run-log.ts @@ -1,6 +1,6 @@ import fs from "node:fs/promises"; import path from "node:path"; -import type { CronRunStatus, CronRunTelemetry } from "./types.js"; +import type { CronDeliveryStatus, CronRunStatus, CronRunTelemetry } from "./types.js"; export type CronRunLogEntry = { ts: number; @@ -10,6 +10,8 @@ export type CronRunLogEntry = { error?: string; summary?: string; delivered?: boolean; + deliveryStatus?: CronDeliveryStatus; + deliveryError?: string; sessionId?: string; sessionKey?: string; runAtMs?: number; @@ -131,6 +133,17 @@ export async function readCronRunLogEntries( if (typeof obj.delivered === "boolean") { entry.delivered = obj.delivered; } + if ( + obj.deliveryStatus === "delivered" || + obj.deliveryStatus === "not-delivered" || + obj.deliveryStatus === "unknown" || + obj.deliveryStatus === "not-requested" + ) { + entry.deliveryStatus = obj.deliveryStatus; + } + if (typeof obj.deliveryError === "string") { + entry.deliveryError = obj.deliveryError; + } if (typeof obj.sessionId === "string" && obj.sessionId.trim().length > 0) { entry.sessionId = obj.sessionId; } diff --git a/src/cron/service.persists-delivered-status.test.ts b/src/cron/service.persists-delivered-status.test.ts index 4af3dd575..10c8319fb 100644 --- a/src/cron/service.persists-delivered-status.test.ts +++ b/src/cron/service.persists-delivered-status.test.ts @@ -40,7 +40,7 @@ function buildMainSessionSystemEventJob(name: string): CronAddInput { function createIsolatedCronWithFinishedBarrier(params: { storePath: string; delivered?: boolean; - onFinished?: (evt: { jobId: string; delivered?: boolean }) => void; + onFinished?: (evt: { jobId: string; delivered?: boolean; deliveryStatus?: string }) => void; }) { const finished = createFinishedBarrier(); const cron = new CronService({ @@ -56,7 +56,11 @@ function createIsolatedCronWithFinishedBarrier(params: { })), onEvent: (evt) => { if (evt.action === "finished") { - params.onFinished?.({ jobId: evt.jobId, delivered: evt.delivered }); + params.onFinished?.({ + jobId: evt.jobId, + delivered: evt.delivered, + deliveryStatus: evt.deliveryStatus, + }); } finished.onEvent(evt); }, @@ -94,7 +98,10 @@ describe("CronService persists delivered status", () => { }); expect(updated?.state.lastStatus).toBe("ok"); + expect(updated?.state.lastRunStatus).toBe("ok"); expect(updated?.state.lastDelivered).toBe(true); + expect(updated?.state.lastDeliveryStatus).toBe("delivered"); + expect(updated?.state.lastDeliveryError).toBeUndefined(); cron.stop(); }); @@ -114,12 +121,15 @@ describe("CronService persists delivered status", () => { }); expect(updated?.state.lastStatus).toBe("ok"); + expect(updated?.state.lastRunStatus).toBe("ok"); expect(updated?.state.lastDelivered).toBe(false); + expect(updated?.state.lastDeliveryStatus).toBe("not-delivered"); + expect(updated?.state.lastDeliveryError).toBeUndefined(); cron.stop(); }); - it("persists lastDelivered=undefined when isolated job does not deliver", async () => { + it("persists not-requested delivery state when delivery is not configured", async () => { const store = await makeStorePath(); const { cron, finished } = createIsolatedCronWithFinishedBarrier({ storePath: store.storePath, @@ -133,7 +143,35 @@ describe("CronService persists delivered status", () => { }); expect(updated?.state.lastStatus).toBe("ok"); + expect(updated?.state.lastRunStatus).toBe("ok"); expect(updated?.state.lastDelivered).toBeUndefined(); + expect(updated?.state.lastDeliveryStatus).toBe("not-requested"); + expect(updated?.state.lastDeliveryError).toBeUndefined(); + + cron.stop(); + }); + + it("persists unknown delivery state when delivery is requested but the runner omits delivered", async () => { + const store = await makeStorePath(); + const { cron, finished } = createIsolatedCronWithFinishedBarrier({ + storePath: store.storePath, + }); + + await cron.start(); + const { updated } = await runSingleJobAndReadState({ + cron, + finished, + job: { + ...buildIsolatedAgentTurnJob("delivery-unknown"), + delivery: { mode: "announce", channel: "telegram", to: "123" }, + }, + }); + + expect(updated?.state.lastStatus).toBe("ok"); + expect(updated?.state.lastRunStatus).toBe("ok"); + expect(updated?.state.lastDelivered).toBeUndefined(); + expect(updated?.state.lastDeliveryStatus).toBe("unknown"); + expect(updated?.state.lastDeliveryError).toBeUndefined(); cron.stop(); }); @@ -153,7 +191,9 @@ describe("CronService persists delivered status", () => { }); expect(updated?.state.lastStatus).toBe("ok"); + expect(updated?.state.lastRunStatus).toBe("ok"); expect(updated?.state.lastDelivered).toBeUndefined(); + expect(updated?.state.lastDeliveryStatus).toBe("not-requested"); expect(enqueueSystemEvent).toHaveBeenCalled(); cron.stop(); @@ -161,7 +201,7 @@ describe("CronService persists delivered status", () => { it("emits delivered in the finished event", async () => { const store = await makeStorePath(); - let capturedEvent: { jobId: string; delivered?: boolean } | undefined; + let capturedEvent: { jobId: string; delivered?: boolean; deliveryStatus?: string } | undefined; const { cron, finished } = createIsolatedCronWithFinishedBarrier({ storePath: store.storePath, delivered: true, @@ -179,6 +219,7 @@ describe("CronService persists delivered status", () => { expect(capturedEvent).toBeDefined(); expect(capturedEvent?.delivered).toBe(true); + expect(capturedEvent?.deliveryStatus).toBe("delivered"); cron.stop(); }); }); diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index bea2af86c..5b7731579 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -309,6 +309,8 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f error: coreResult.error, summary: coreResult.summary, delivered: coreResult.delivered, + deliveryStatus: job.state.lastDeliveryStatus, + deliveryError: job.state.lastDeliveryError, sessionId: coreResult.sessionId, sessionKey: coreResult.sessionKey, runAtMs: prepared.startedAt, diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index b366da7ab..19b139b37 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -1,6 +1,7 @@ import type { CronConfig } from "../../config/types.cron.js"; import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import type { + CronDeliveryStatus, CronJob, CronJobCreate, CronJobPatch, @@ -19,6 +20,8 @@ export type CronEvent = { error?: string; summary?: string; delivered?: boolean; + deliveryStatus?: CronDeliveryStatus; + deliveryError?: string; sessionId?: string; sessionKey?: string; nextRunAtMs?: number; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index a99d2acec..7f5bc01c0 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -2,7 +2,13 @@ import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import { DEFAULT_AGENT_ID } from "../../routing/session-key.js"; import { resolveCronDeliveryPlan } from "../delivery.js"; import { sweepCronRunSessions } from "../session-reaper.js"; -import type { CronJob, CronRunOutcome, CronRunStatus, CronRunTelemetry } from "../types.js"; +import type { + CronDeliveryStatus, + CronJob, + CronRunOutcome, + CronRunStatus, + CronRunTelemetry, +} from "../types.js"; import { computeJobNextRunAtMs, nextWakeAtMs, @@ -63,6 +69,16 @@ function errorBackoffMs(consecutiveErrors: number): number { return ERROR_BACKOFF_SCHEDULE_MS[Math.max(0, idx)]; } +function resolveDeliveryStatus(params: { job: CronJob; delivered?: boolean }): CronDeliveryStatus { + if (params.delivered === true) { + return "delivered"; + } + if (params.delivered === false) { + return "not-delivered"; + } + return resolveCronDeliveryPlan(params.job).requested ? "unknown" : "not-requested"; +} + /** * Apply the result of a job execution to the job's state. * Handles consecutive error tracking, exponential backoff, one-shot disable, @@ -81,10 +97,15 @@ export function applyJobResult( ): boolean { job.state.runningAtMs = undefined; job.state.lastRunAtMs = result.startedAt; + job.state.lastRunStatus = result.status; job.state.lastStatus = result.status; job.state.lastDurationMs = Math.max(0, result.endedAt - result.startedAt); job.state.lastError = result.error; job.state.lastDelivered = result.delivered; + const deliveryStatus = resolveDeliveryStatus({ job, delivered: result.delivered }); + job.state.lastDeliveryStatus = deliveryStatus; + job.state.lastDeliveryError = + deliveryStatus === "not-delivered" && result.error ? result.error : undefined; job.updatedAtMs = result.endedAt; // Track consecutive errors for backoff / auto-disable. @@ -748,6 +769,8 @@ function emitJobFinished( error: result.error, summary: result.summary, delivered: result.delivered, + deliveryStatus: job.state.lastDeliveryStatus, + deliveryError: job.state.lastDeliveryError, sessionId: result.sessionId, sessionKey: result.sessionKey, runAtMs, diff --git a/src/cron/types.ts b/src/cron/types.ts index 36a5c28fa..ec1f8752c 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -28,6 +28,7 @@ export type CronDelivery = { export type CronDeliveryPatch = Partial; export type CronRunStatus = "ok" | "error" | "skipped"; +export type CronDeliveryStatus = "delivered" | "not-delivered" | "unknown" | "not-requested"; export type CronUsageSummary = { input_tokens?: number; @@ -86,6 +87,9 @@ export type CronJobState = { nextRunAtMs?: number; runningAtMs?: number; lastRunAtMs?: number; + /** Preferred execution outcome field. */ + lastRunStatus?: CronRunStatus; + /** Back-compat alias for lastRunStatus. */ lastStatus?: "ok" | "error" | "skipped"; lastError?: string; lastDurationMs?: number; @@ -93,6 +97,10 @@ export type CronJobState = { consecutiveErrors?: number; /** Number of consecutive schedule computation errors. Auto-disables job after threshold. */ scheduleErrorCount?: number; + /** Explicit delivery outcome, separate from execution outcome. */ + lastDeliveryStatus?: CronDeliveryStatus; + /** Delivery-specific error text when available. */ + lastDeliveryError?: string; /** Whether the last run's output was delivered to the target channel. */ lastDelivered?: boolean; }; diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index c2e0d0620..b162436d0 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -21,6 +21,17 @@ function cronAgentTurnPayloadSchema(params: { message: TSchema }) { const CronSessionTargetSchema = Type.Union([Type.Literal("main"), Type.Literal("isolated")]); const CronWakeModeSchema = Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]); +const CronRunStatusSchema = Type.Union([ + Type.Literal("ok"), + Type.Literal("error"), + Type.Literal("skipped"), +]); +const CronDeliveryStatusSchema = Type.Union([ + Type.Literal("delivered"), + Type.Literal("not-delivered"), + Type.Literal("unknown"), + Type.Literal("not-requested"), +]); const CronCommonOptionalFields = { agentId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), sessionKey: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), @@ -151,13 +162,14 @@ export const CronJobStateSchema = Type.Object( nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })), runningAtMs: Type.Optional(Type.Integer({ minimum: 0 })), lastRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })), - lastStatus: Type.Optional( - Type.Union([Type.Literal("ok"), Type.Literal("error"), Type.Literal("skipped")]), - ), + lastRunStatus: Type.Optional(CronRunStatusSchema), + lastStatus: Type.Optional(CronRunStatusSchema), lastError: Type.Optional(Type.String()), lastDurationMs: Type.Optional(Type.Integer({ minimum: 0 })), consecutiveErrors: Type.Optional(Type.Integer({ minimum: 0 })), lastDelivered: Type.Optional(Type.Boolean()), + lastDeliveryStatus: Type.Optional(CronDeliveryStatusSchema), + lastDeliveryError: Type.Optional(Type.String()), }, { additionalProperties: false }, ); @@ -238,11 +250,12 @@ export const CronRunLogEntrySchema = Type.Object( ts: Type.Integer({ minimum: 0 }), jobId: NonEmptyString, action: Type.Literal("finished"), - status: Type.Optional( - Type.Union([Type.Literal("ok"), Type.Literal("error"), Type.Literal("skipped")]), - ), + status: Type.Optional(CronRunStatusSchema), error: Type.Optional(Type.String()), summary: Type.Optional(Type.String()), + delivered: Type.Optional(Type.Boolean()), + deliveryStatus: Type.Optional(CronDeliveryStatusSchema), + deliveryError: Type.Optional(Type.String()), sessionId: Type.Optional(NonEmptyString), sessionKey: Type.Optional(NonEmptyString), runAtMs: Type.Optional(Type.Integer({ minimum: 0 })), diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index b0b2de28c..be6f63ed1 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -297,6 +297,8 @@ export function buildGatewayCronService(params: { error: evt.error, summary: evt.summary, delivered: evt.delivered, + deliveryStatus: evt.deliveryStatus, + deliveryError: evt.deliveryError, sessionId: evt.sessionId, sessionKey: evt.sessionKey, runAtMs: evt.runAtMs, diff --git a/src/gateway/server.cron.test.ts b/src/gateway/server.cron.test.ts index 9610d46c7..ed924f792 100644 --- a/src/gateway/server.cron.test.ts +++ b/src/gateway/server.cron.test.ts @@ -407,11 +407,13 @@ describe("gateway server cron", () => { action?: unknown; status?: unknown; summary?: unknown; + deliveryStatus?: unknown; }; expect(last.action).toBe("finished"); expect(last.jobId).toBe(jobId); expect(last.status).toBe("ok"); expect(last.summary).toBe("hello"); + expect(last.deliveryStatus).toBe("not-requested"); const runsRes = await rpcReq(ws, "cron.runs", { id: jobId, limit: 50 }); expect(runsRes.ok).toBe(true); @@ -419,6 +421,9 @@ describe("gateway server cron", () => { expect(Array.isArray(entries)).toBe(true); expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId); expect((entries as Array<{ summary?: unknown }>).at(-1)?.summary).toBe("hello"); + expect((entries as Array<{ deliveryStatus?: unknown }>).at(-1)?.deliveryStatus).toBe( + "not-requested", + ); const statusRes = await rpcReq(ws, "cron.status", {}); expect(statusRes.ok).toBe(true);