fix(cron): split run and delivery status tracking

This commit is contained in:
Peter Steinberger
2026-02-22 20:07:34 +01:00
parent c3bb723673
commit aa4c250eb8
11 changed files with 128 additions and 13 deletions

View File

@@ -45,6 +45,7 @@ Docs: https://docs.openclaw.ai
- Cron/Service: execute manual `cron.run` jobs outside the cron lock (while still persisting started/finished state atomically) so `cron.list` and `cron.status` remain responsive during long forced runs. (#23628) Thanks @dsgraves.
- Cron/Timer: keep a watchdog recheck timer armed while `onTimer` is actively executing so the scheduler continues polling even if a due-run tick stalls for an extended period. (#23628) Thanks @dsgraves.
- Cron/Run: enforce the same per-job timeout guard for manual `cron.run` executions as timer-driven runs, including abort propagation for isolated agent jobs, so forced runs cannot wedge indefinitely. (#23704) Thanks @tkuehnl.
- Cron/Status: split execution outcome (`lastRunStatus`) from delivery outcome (`lastDeliveryStatus`) in persisted cron state, finished events, and run history so failed/unknown announcement delivery is visible without conflating it with run errors.
- Agents/Compaction: restore embedded compaction safeguard/context-pruning extension loading in production by wiring bundled extension factories into the resource loader instead of runtime file-path resolution. (#22349) Thanks @Glucksberg.
- Feishu/Media: for inbound video messages that include both `file_key` (video) and `image_key` (thumbnail), prefer `file_key` when downloading media so video attachments are saved instead of silently failing on thumbnail keys. (#23633)
- Hooks/Cron: suppress duplicate main-session events for delivered hook turns and mark `SILENT_REPLY_TOKEN` (`NO_REPLY`) early exits as delivered to prevent hook context pollution. (#20678) Thanks @JonathanWorks.

View File

@@ -105,7 +105,7 @@ describe("cron run log", () => {
});
});
it("ignores invalid and non-finished lines while preserving delivered flag", async () => {
it("ignores invalid and non-finished lines while preserving delivery fields", async () => {
await withRunLogDir("openclaw-cron-log-filter-", async (dir) => {
const logPath = path.join(dir, "runs", "job-1.jsonl");
await fs.mkdir(path.dirname(logPath), { recursive: true });
@@ -120,6 +120,8 @@ describe("cron run log", () => {
action: "finished",
status: "ok",
delivered: true,
deliveryStatus: "not-delivered",
deliveryError: "announce failed",
}),
].join("\n") + "\n",
"utf-8",
@@ -129,6 +131,8 @@ describe("cron run log", () => {
expect(entries).toHaveLength(1);
expect(entries[0]?.ts).toBe(2);
expect(entries[0]?.delivered).toBe(true);
expect(entries[0]?.deliveryStatus).toBe("not-delivered");
expect(entries[0]?.deliveryError).toBe("announce failed");
});
});

View File

@@ -1,6 +1,6 @@
import fs from "node:fs/promises";
import path from "node:path";
import type { CronRunStatus, CronRunTelemetry } from "./types.js";
import type { CronDeliveryStatus, CronRunStatus, CronRunTelemetry } from "./types.js";
export type CronRunLogEntry = {
ts: number;
@@ -10,6 +10,8 @@ export type CronRunLogEntry = {
error?: string;
summary?: string;
delivered?: boolean;
deliveryStatus?: CronDeliveryStatus;
deliveryError?: string;
sessionId?: string;
sessionKey?: string;
runAtMs?: number;
@@ -131,6 +133,17 @@ export async function readCronRunLogEntries(
if (typeof obj.delivered === "boolean") {
entry.delivered = obj.delivered;
}
if (
obj.deliveryStatus === "delivered" ||
obj.deliveryStatus === "not-delivered" ||
obj.deliveryStatus === "unknown" ||
obj.deliveryStatus === "not-requested"
) {
entry.deliveryStatus = obj.deliveryStatus;
}
if (typeof obj.deliveryError === "string") {
entry.deliveryError = obj.deliveryError;
}
if (typeof obj.sessionId === "string" && obj.sessionId.trim().length > 0) {
entry.sessionId = obj.sessionId;
}

View File

@@ -40,7 +40,7 @@ function buildMainSessionSystemEventJob(name: string): CronAddInput {
function createIsolatedCronWithFinishedBarrier(params: {
storePath: string;
delivered?: boolean;
onFinished?: (evt: { jobId: string; delivered?: boolean }) => void;
onFinished?: (evt: { jobId: string; delivered?: boolean; deliveryStatus?: string }) => void;
}) {
const finished = createFinishedBarrier();
const cron = new CronService({
@@ -56,7 +56,11 @@ function createIsolatedCronWithFinishedBarrier(params: {
})),
onEvent: (evt) => {
if (evt.action === "finished") {
params.onFinished?.({ jobId: evt.jobId, delivered: evt.delivered });
params.onFinished?.({
jobId: evt.jobId,
delivered: evt.delivered,
deliveryStatus: evt.deliveryStatus,
});
}
finished.onEvent(evt);
},
@@ -94,7 +98,10 @@ describe("CronService persists delivered status", () => {
});
expect(updated?.state.lastStatus).toBe("ok");
expect(updated?.state.lastRunStatus).toBe("ok");
expect(updated?.state.lastDelivered).toBe(true);
expect(updated?.state.lastDeliveryStatus).toBe("delivered");
expect(updated?.state.lastDeliveryError).toBeUndefined();
cron.stop();
});
@@ -114,12 +121,15 @@ describe("CronService persists delivered status", () => {
});
expect(updated?.state.lastStatus).toBe("ok");
expect(updated?.state.lastRunStatus).toBe("ok");
expect(updated?.state.lastDelivered).toBe(false);
expect(updated?.state.lastDeliveryStatus).toBe("not-delivered");
expect(updated?.state.lastDeliveryError).toBeUndefined();
cron.stop();
});
it("persists lastDelivered=undefined when isolated job does not deliver", async () => {
it("persists not-requested delivery state when delivery is not configured", async () => {
const store = await makeStorePath();
const { cron, finished } = createIsolatedCronWithFinishedBarrier({
storePath: store.storePath,
@@ -133,7 +143,35 @@ describe("CronService persists delivered status", () => {
});
expect(updated?.state.lastStatus).toBe("ok");
expect(updated?.state.lastRunStatus).toBe("ok");
expect(updated?.state.lastDelivered).toBeUndefined();
expect(updated?.state.lastDeliveryStatus).toBe("not-requested");
expect(updated?.state.lastDeliveryError).toBeUndefined();
cron.stop();
});
it("persists unknown delivery state when delivery is requested but the runner omits delivered", async () => {
const store = await makeStorePath();
const { cron, finished } = createIsolatedCronWithFinishedBarrier({
storePath: store.storePath,
});
await cron.start();
const { updated } = await runSingleJobAndReadState({
cron,
finished,
job: {
...buildIsolatedAgentTurnJob("delivery-unknown"),
delivery: { mode: "announce", channel: "telegram", to: "123" },
},
});
expect(updated?.state.lastStatus).toBe("ok");
expect(updated?.state.lastRunStatus).toBe("ok");
expect(updated?.state.lastDelivered).toBeUndefined();
expect(updated?.state.lastDeliveryStatus).toBe("unknown");
expect(updated?.state.lastDeliveryError).toBeUndefined();
cron.stop();
});
@@ -153,7 +191,9 @@ describe("CronService persists delivered status", () => {
});
expect(updated?.state.lastStatus).toBe("ok");
expect(updated?.state.lastRunStatus).toBe("ok");
expect(updated?.state.lastDelivered).toBeUndefined();
expect(updated?.state.lastDeliveryStatus).toBe("not-requested");
expect(enqueueSystemEvent).toHaveBeenCalled();
cron.stop();
@@ -161,7 +201,7 @@ describe("CronService persists delivered status", () => {
it("emits delivered in the finished event", async () => {
const store = await makeStorePath();
let capturedEvent: { jobId: string; delivered?: boolean } | undefined;
let capturedEvent: { jobId: string; delivered?: boolean; deliveryStatus?: string } | undefined;
const { cron, finished } = createIsolatedCronWithFinishedBarrier({
storePath: store.storePath,
delivered: true,
@@ -179,6 +219,7 @@ describe("CronService persists delivered status", () => {
expect(capturedEvent).toBeDefined();
expect(capturedEvent?.delivered).toBe(true);
expect(capturedEvent?.deliveryStatus).toBe("delivered");
cron.stop();
});
});

View File

@@ -309,6 +309,8 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
error: coreResult.error,
summary: coreResult.summary,
delivered: coreResult.delivered,
deliveryStatus: job.state.lastDeliveryStatus,
deliveryError: job.state.lastDeliveryError,
sessionId: coreResult.sessionId,
sessionKey: coreResult.sessionKey,
runAtMs: prepared.startedAt,

View File

@@ -1,6 +1,7 @@
import type { CronConfig } from "../../config/types.cron.js";
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import type {
CronDeliveryStatus,
CronJob,
CronJobCreate,
CronJobPatch,
@@ -19,6 +20,8 @@ export type CronEvent = {
error?: string;
summary?: string;
delivered?: boolean;
deliveryStatus?: CronDeliveryStatus;
deliveryError?: string;
sessionId?: string;
sessionKey?: string;
nextRunAtMs?: number;

View File

@@ -2,7 +2,13 @@ import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
import { resolveCronDeliveryPlan } from "../delivery.js";
import { sweepCronRunSessions } from "../session-reaper.js";
import type { CronJob, CronRunOutcome, CronRunStatus, CronRunTelemetry } from "../types.js";
import type {
CronDeliveryStatus,
CronJob,
CronRunOutcome,
CronRunStatus,
CronRunTelemetry,
} from "../types.js";
import {
computeJobNextRunAtMs,
nextWakeAtMs,
@@ -63,6 +69,16 @@ function errorBackoffMs(consecutiveErrors: number): number {
return ERROR_BACKOFF_SCHEDULE_MS[Math.max(0, idx)];
}
function resolveDeliveryStatus(params: { job: CronJob; delivered?: boolean }): CronDeliveryStatus {
if (params.delivered === true) {
return "delivered";
}
if (params.delivered === false) {
return "not-delivered";
}
return resolveCronDeliveryPlan(params.job).requested ? "unknown" : "not-requested";
}
/**
* Apply the result of a job execution to the job's state.
* Handles consecutive error tracking, exponential backoff, one-shot disable,
@@ -81,10 +97,15 @@ export function applyJobResult(
): boolean {
job.state.runningAtMs = undefined;
job.state.lastRunAtMs = result.startedAt;
job.state.lastRunStatus = result.status;
job.state.lastStatus = result.status;
job.state.lastDurationMs = Math.max(0, result.endedAt - result.startedAt);
job.state.lastError = result.error;
job.state.lastDelivered = result.delivered;
const deliveryStatus = resolveDeliveryStatus({ job, delivered: result.delivered });
job.state.lastDeliveryStatus = deliveryStatus;
job.state.lastDeliveryError =
deliveryStatus === "not-delivered" && result.error ? result.error : undefined;
job.updatedAtMs = result.endedAt;
// Track consecutive errors for backoff / auto-disable.
@@ -748,6 +769,8 @@ function emitJobFinished(
error: result.error,
summary: result.summary,
delivered: result.delivered,
deliveryStatus: job.state.lastDeliveryStatus,
deliveryError: job.state.lastDeliveryError,
sessionId: result.sessionId,
sessionKey: result.sessionKey,
runAtMs,

View File

@@ -28,6 +28,7 @@ export type CronDelivery = {
export type CronDeliveryPatch = Partial<CronDelivery>;
export type CronRunStatus = "ok" | "error" | "skipped";
export type CronDeliveryStatus = "delivered" | "not-delivered" | "unknown" | "not-requested";
export type CronUsageSummary = {
input_tokens?: number;
@@ -86,6 +87,9 @@ export type CronJobState = {
nextRunAtMs?: number;
runningAtMs?: number;
lastRunAtMs?: number;
/** Preferred execution outcome field. */
lastRunStatus?: CronRunStatus;
/** Back-compat alias for lastRunStatus. */
lastStatus?: "ok" | "error" | "skipped";
lastError?: string;
lastDurationMs?: number;
@@ -93,6 +97,10 @@ export type CronJobState = {
consecutiveErrors?: number;
/** Number of consecutive schedule computation errors. Auto-disables job after threshold. */
scheduleErrorCount?: number;
/** Explicit delivery outcome, separate from execution outcome. */
lastDeliveryStatus?: CronDeliveryStatus;
/** Delivery-specific error text when available. */
lastDeliveryError?: string;
/** Whether the last run's output was delivered to the target channel. */
lastDelivered?: boolean;
};

View File

@@ -21,6 +21,17 @@ function cronAgentTurnPayloadSchema(params: { message: TSchema }) {
const CronSessionTargetSchema = Type.Union([Type.Literal("main"), Type.Literal("isolated")]);
const CronWakeModeSchema = Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]);
const CronRunStatusSchema = Type.Union([
Type.Literal("ok"),
Type.Literal("error"),
Type.Literal("skipped"),
]);
const CronDeliveryStatusSchema = Type.Union([
Type.Literal("delivered"),
Type.Literal("not-delivered"),
Type.Literal("unknown"),
Type.Literal("not-requested"),
]);
const CronCommonOptionalFields = {
agentId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])),
sessionKey: Type.Optional(Type.Union([NonEmptyString, Type.Null()])),
@@ -151,13 +162,14 @@ export const CronJobStateSchema = Type.Object(
nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
runningAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
lastRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
lastStatus: Type.Optional(
Type.Union([Type.Literal("ok"), Type.Literal("error"), Type.Literal("skipped")]),
),
lastRunStatus: Type.Optional(CronRunStatusSchema),
lastStatus: Type.Optional(CronRunStatusSchema),
lastError: Type.Optional(Type.String()),
lastDurationMs: Type.Optional(Type.Integer({ minimum: 0 })),
consecutiveErrors: Type.Optional(Type.Integer({ minimum: 0 })),
lastDelivered: Type.Optional(Type.Boolean()),
lastDeliveryStatus: Type.Optional(CronDeliveryStatusSchema),
lastDeliveryError: Type.Optional(Type.String()),
},
{ additionalProperties: false },
);
@@ -238,11 +250,12 @@ export const CronRunLogEntrySchema = Type.Object(
ts: Type.Integer({ minimum: 0 }),
jobId: NonEmptyString,
action: Type.Literal("finished"),
status: Type.Optional(
Type.Union([Type.Literal("ok"), Type.Literal("error"), Type.Literal("skipped")]),
),
status: Type.Optional(CronRunStatusSchema),
error: Type.Optional(Type.String()),
summary: Type.Optional(Type.String()),
delivered: Type.Optional(Type.Boolean()),
deliveryStatus: Type.Optional(CronDeliveryStatusSchema),
deliveryError: Type.Optional(Type.String()),
sessionId: Type.Optional(NonEmptyString),
sessionKey: Type.Optional(NonEmptyString),
runAtMs: Type.Optional(Type.Integer({ minimum: 0 })),

View File

@@ -297,6 +297,8 @@ export function buildGatewayCronService(params: {
error: evt.error,
summary: evt.summary,
delivered: evt.delivered,
deliveryStatus: evt.deliveryStatus,
deliveryError: evt.deliveryError,
sessionId: evt.sessionId,
sessionKey: evt.sessionKey,
runAtMs: evt.runAtMs,

View File

@@ -407,11 +407,13 @@ describe("gateway server cron", () => {
action?: unknown;
status?: unknown;
summary?: unknown;
deliveryStatus?: unknown;
};
expect(last.action).toBe("finished");
expect(last.jobId).toBe(jobId);
expect(last.status).toBe("ok");
expect(last.summary).toBe("hello");
expect(last.deliveryStatus).toBe("not-requested");
const runsRes = await rpcReq(ws, "cron.runs", { id: jobId, limit: 50 });
expect(runsRes.ok).toBe(true);
@@ -419,6 +421,9 @@ describe("gateway server cron", () => {
expect(Array.isArray(entries)).toBe(true);
expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId);
expect((entries as Array<{ summary?: unknown }>).at(-1)?.summary).toBe("hello");
expect((entries as Array<{ deliveryStatus?: unknown }>).at(-1)?.deliveryStatus).toBe(
"not-requested",
);
const statusRes = await rpcReq(ws, "cron.status", {});
expect(statusRes.ok).toBe(true);