feat(cron): configurable failure alerts for repeated job errors (openclaw#24789) thanks @0xbrak

Verified:
- pnpm install --frozen-lockfile
- pnpm check
- pnpm test -- --run src/cron/service.failure-alert.test.ts src/cli/cron-cli.test.ts src/gateway/protocol/cron-validators.test.ts

Co-authored-by: 0xbrak <181251288+0xbrak@users.noreply.github.com>
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
0xbrak
2026-03-01 09:18:15 -05:00
committed by GitHub
parent f902697bd5
commit 4637b90c07
18 changed files with 842 additions and 1 deletions

View File

@@ -551,4 +551,53 @@ describe("cron cli", () => {
it("rejects --exact on edit when existing job is not cron", async () => {
await expectCronEditWithScheduleLookupExit({ kind: "every", everyMs: 60_000 }, ["--exact"]);
});
it("patches failure alert settings on cron edit", async () => {
callGatewayFromCli.mockClear();
const program = buildProgram();
await program.parseAsync(
[
"cron",
"edit",
"job-1",
"--failure-alert-after",
"3",
"--failure-alert-cooldown",
"1h",
"--failure-alert-channel",
"telegram",
"--failure-alert-to",
"19098680",
],
{ from: "user" },
);
const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update");
const patch = updateCall?.[2] as {
patch?: {
failureAlert?: { after?: number; cooldownMs?: number; channel?: string; to?: string };
};
};
expect(patch?.patch?.failureAlert?.after).toBe(3);
expect(patch?.patch?.failureAlert?.cooldownMs).toBe(3_600_000);
expect(patch?.patch?.failureAlert?.channel).toBe("telegram");
expect(patch?.patch?.failureAlert?.to).toBe("19098680");
});
it("supports --no-failure-alert on cron edit", async () => {
callGatewayFromCli.mockClear();
const program = buildProgram();
await program.parseAsync(["cron", "edit", "job-1", "--no-failure-alert"], {
from: "user",
});
const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update");
const patch = updateCall?.[2] as { patch?: { failureAlert?: boolean } };
expect(patch?.patch?.failureAlert).toBe(false);
});
});

View File

@@ -62,6 +62,15 @@ export function registerCronEditCommand(cron: Command) {
.option("--account <id>", "Channel account id for delivery (multi-account setups)")
.option("--best-effort-deliver", "Do not fail job if delivery fails")
.option("--no-best-effort-deliver", "Fail job when delivery fails")
.option("--failure-alert", "Enable failure alerts for this job")
.option("--no-failure-alert", "Disable failure alerts for this job")
.option("--failure-alert-after <n>", "Alert after N consecutive job errors")
.option(
"--failure-alert-channel <channel>",
`Failure alert channel (${getCronChannelOptions()})`,
)
.option("--failure-alert-to <dest>", "Failure alert destination")
.option("--failure-alert-cooldown <duration>", "Minimum time between alerts (e.g. 1h, 30m)")
.action(async (id, opts) => {
try {
if (opts.session === "main" && opts.message) {
@@ -264,6 +273,49 @@ export function registerCronEditCommand(cron: Command) {
patch.delivery = delivery;
}
const hasFailureAlertAfter = typeof opts.failureAlertAfter === "string";
const hasFailureAlertChannel = typeof opts.failureAlertChannel === "string";
const hasFailureAlertTo = typeof opts.failureAlertTo === "string";
const hasFailureAlertCooldown = typeof opts.failureAlertCooldown === "string";
const hasFailureAlertFields =
hasFailureAlertAfter ||
hasFailureAlertChannel ||
hasFailureAlertTo ||
hasFailureAlertCooldown;
const failureAlertFlag =
typeof opts.failureAlert === "boolean" ? opts.failureAlert : undefined;
if (failureAlertFlag === false && hasFailureAlertFields) {
throw new Error("Use --no-failure-alert alone (without failure-alert-* options).");
}
if (failureAlertFlag === false) {
patch.failureAlert = false;
} else if (failureAlertFlag === true || hasFailureAlertFields) {
const failureAlert: Record<string, unknown> = {};
if (hasFailureAlertAfter) {
const after = Number.parseInt(String(opts.failureAlertAfter), 10);
if (!Number.isFinite(after) || after <= 0) {
throw new Error("Invalid --failure-alert-after (must be a positive integer).");
}
failureAlert.after = after;
}
if (hasFailureAlertChannel) {
const channel = String(opts.failureAlertChannel).trim().toLowerCase();
failureAlert.channel = channel ? channel : undefined;
}
if (hasFailureAlertTo) {
const to = String(opts.failureAlertTo).trim();
failureAlert.to = to ? to : undefined;
}
if (hasFailureAlertCooldown) {
const cooldownMs = parseDurationMs(String(opts.failureAlertCooldown));
if (!cooldownMs && cooldownMs !== 0) {
throw new Error("Invalid --failure-alert-cooldown.");
}
failureAlert.cooldownMs = cooldownMs;
}
patch.failureAlert = failureAlert;
}
const res = await callGatewayFromCli("cron.update", opts, {
id,
patch,

View File

@@ -10,6 +10,12 @@ export type CronRetryConfig = {
retryOn?: CronRetryOn[];
};
export type CronFailureAlertConfig = {
enabled?: boolean;
after?: number;
cooldownMs?: number;
};
export type CronConfig = {
enabled?: boolean;
store?: string;
@@ -37,4 +43,5 @@ export type CronConfig = {
maxBytes?: number | string;
keepLines?: number;
};
failureAlert?: CronFailureAlertConfig;
};

View File

@@ -395,6 +395,14 @@ export const OpenClawSchema = z
})
.strict()
.optional(),
failureAlert: z
.object({
enabled: z.boolean().optional(),
after: z.number().int().min(1).optional(),
cooldownMs: z.number().int().min(0).optional(),
})
.strict()
.optional(),
})
.strict()
.superRefine((val, ctx) => {

View File

@@ -0,0 +1,198 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { CronService } from "./service.js";
const noopLogger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
async function makeStorePath() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-failure-alert-"));
return {
storePath: path.join(dir, "cron", "jobs.json"),
cleanup: async () => {
await fs.rm(dir, { recursive: true, force: true });
},
};
}
describe("CronService failure alerts", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
noopLogger.debug.mockClear();
noopLogger.info.mockClear();
noopLogger.warn.mockClear();
noopLogger.error.mockClear();
});
afterEach(() => {
vi.useRealTimers();
});
it("alerts after configured consecutive failures and honors cooldown", async () => {
const store = await makeStorePath();
const sendCronFailureAlert = vi.fn(async () => undefined);
const runIsolatedAgentJob = vi.fn(async () => ({
status: "error" as const,
error: "wrong model id",
}));
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
cronConfig: {
failureAlert: {
enabled: true,
after: 2,
cooldownMs: 60_000,
},
},
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
sendCronFailureAlert,
});
await cron.start();
const job = await cron.add({
name: "daily report",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "run report" },
delivery: { mode: "announce", channel: "telegram", to: "19098680" },
});
await cron.run(job.id, "force");
expect(sendCronFailureAlert).not.toHaveBeenCalled();
await cron.run(job.id, "force");
expect(sendCronFailureAlert).toHaveBeenCalledTimes(1);
expect(sendCronFailureAlert).toHaveBeenLastCalledWith(
expect.objectContaining({
job: expect.objectContaining({ id: job.id }),
channel: "telegram",
to: "19098680",
text: expect.stringContaining('Cron job "daily report" failed 2 times'),
}),
);
await cron.run(job.id, "force");
expect(sendCronFailureAlert).toHaveBeenCalledTimes(1);
vi.advanceTimersByTime(60_000);
await cron.run(job.id, "force");
expect(sendCronFailureAlert).toHaveBeenCalledTimes(2);
expect(sendCronFailureAlert).toHaveBeenLastCalledWith(
expect.objectContaining({
text: expect.stringContaining('Cron job "daily report" failed 4 times'),
}),
);
cron.stop();
await store.cleanup();
});
it("supports per-job failure alert override when global alerts are disabled", async () => {
const store = await makeStorePath();
const sendCronFailureAlert = vi.fn(async () => undefined);
const runIsolatedAgentJob = vi.fn(async () => ({
status: "error" as const,
error: "timeout",
}));
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
cronConfig: {
failureAlert: {
enabled: false,
},
},
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
sendCronFailureAlert,
});
await cron.start();
const job = await cron.add({
name: "job with override",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "run report" },
failureAlert: {
after: 1,
channel: "telegram",
to: "12345",
cooldownMs: 1,
},
});
await cron.run(job.id, "force");
expect(sendCronFailureAlert).toHaveBeenCalledTimes(1);
expect(sendCronFailureAlert).toHaveBeenLastCalledWith(
expect.objectContaining({
channel: "telegram",
to: "12345",
}),
);
cron.stop();
await store.cleanup();
});
it("respects per-job failureAlert=false and suppresses alerts", async () => {
const store = await makeStorePath();
const sendCronFailureAlert = vi.fn(async () => undefined);
const runIsolatedAgentJob = vi.fn(async () => ({
status: "error" as const,
error: "auth error",
}));
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
cronConfig: {
failureAlert: {
enabled: true,
after: 1,
},
},
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
sendCronFailureAlert,
});
await cron.start();
const job = await cron.add({
name: "disabled alert job",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "run report" },
failureAlert: false,
});
await cron.run(job.id, "force");
await cron.run(job.id, "force");
expect(sendCronFailureAlert).not.toHaveBeenCalled();
cron.stop();
await store.cleanup();
});
});

View File

@@ -9,6 +9,7 @@ import {
import type {
CronDelivery,
CronDeliveryPatch,
CronFailureAlert,
CronJob,
CronJobCreate,
CronJobPatch,
@@ -419,6 +420,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo
wakeMode: input.wakeMode,
payload: input.payload,
delivery: input.delivery,
failureAlert: input.failureAlert,
state: {
...input.state,
},
@@ -483,6 +485,9 @@ export function applyJobPatch(job: CronJob, patch: CronJobPatch) {
if (patch.delivery) {
job.delivery = mergeCronDelivery(job.delivery, patch.delivery);
}
if ("failureAlert" in patch) {
job.failureAlert = mergeCronFailureAlert(job.failureAlert, patch.failureAlert);
}
if (job.sessionTarget === "main" && job.delivery?.mode !== "webhook") {
job.delivery = undefined;
}
@@ -648,6 +653,42 @@ function mergeCronDelivery(
return next;
}
function mergeCronFailureAlert(
existing: CronFailureAlert | false | undefined,
patch: CronFailureAlert | false | undefined,
): CronFailureAlert | false | undefined {
if (patch === false) {
return false;
}
if (patch === undefined) {
return existing;
}
const base = existing === false || existing === undefined ? {} : existing;
const next: CronFailureAlert = { ...base };
if ("after" in patch) {
const after = typeof patch.after === "number" && Number.isFinite(patch.after) ? patch.after : 0;
next.after = after > 0 ? Math.floor(after) : undefined;
}
if ("channel" in patch) {
const channel = typeof patch.channel === "string" ? patch.channel.trim() : "";
next.channel = channel ? channel : undefined;
}
if ("to" in patch) {
const to = typeof patch.to === "string" ? patch.to.trim() : "";
next.to = to ? to : undefined;
}
if ("cooldownMs" in patch) {
const cooldownMs =
typeof patch.cooldownMs === "number" && Number.isFinite(patch.cooldownMs)
? patch.cooldownMs
: -1;
next.cooldownMs = cooldownMs >= 0 ? Math.floor(cooldownMs) : undefined;
}
return next;
}
export function isJobDue(job: CronJob, nowMs: number, opts: { forced: boolean }) {
if (!job.state) {
job.state = {};

View File

@@ -5,6 +5,7 @@ import type {
CronJob,
CronJobCreate,
CronJobPatch,
CronMessageChannel,
CronRunOutcome,
CronRunStatus,
CronRunTelemetry,
@@ -90,6 +91,12 @@ export type CronServiceDeps = {
} & CronRunOutcome &
CronRunTelemetry
>;
sendCronFailureAlert?: (params: {
job: CronJob;
text: string;
channel: CronMessageChannel;
to?: string;
}) => Promise<void>;
onEvent?: (evt: CronEvent) => void;
};

View File

@@ -6,6 +6,7 @@ import { sweepCronRunSessions } from "../session-reaper.js";
import type {
CronDeliveryStatus,
CronJob,
CronMessageChannel,
CronRunOutcome,
CronRunStatus,
CronRunTelemetry,
@@ -33,6 +34,8 @@ const MAX_TIMER_DELAY_MS = 60_000;
* but always breaks an infinite re-trigger cycle. (See #17821)
*/
const MIN_REFIRE_GAP_MS = 2_000;
const DEFAULT_FAILURE_ALERT_AFTER = 2;
const DEFAULT_FAILURE_ALERT_COOLDOWN_MS = 60 * 60_000; // 1 hour
type TimedCronRunOutcome = CronRunOutcome &
CronRunTelemetry & {
@@ -149,6 +152,106 @@ function resolveDeliveryStatus(params: { job: CronJob; delivered?: boolean }): C
return resolveCronDeliveryPlan(params.job).requested ? "unknown" : "not-requested";
}
function normalizeCronMessageChannel(input: unknown): CronMessageChannel | undefined {
if (typeof input !== "string") {
return undefined;
}
const channel = input.trim().toLowerCase();
return channel ? (channel as CronMessageChannel) : undefined;
}
function normalizeTo(input: unknown): string | undefined {
if (typeof input !== "string") {
return undefined;
}
const to = input.trim();
return to ? to : undefined;
}
function clampPositiveInt(value: unknown, fallback: number): number {
if (typeof value !== "number" || !Number.isFinite(value)) {
return fallback;
}
const floored = Math.floor(value);
return floored >= 1 ? floored : fallback;
}
function clampNonNegativeInt(value: unknown, fallback: number): number {
if (typeof value !== "number" || !Number.isFinite(value)) {
return fallback;
}
const floored = Math.floor(value);
return floored >= 0 ? floored : fallback;
}
function resolveFailureAlert(
state: CronServiceState,
job: CronJob,
): { after: number; cooldownMs: number; channel: CronMessageChannel; to?: string } | null {
const globalConfig = state.deps.cronConfig?.failureAlert;
const jobConfig = job.failureAlert === false ? undefined : job.failureAlert;
if (job.failureAlert === false) {
return null;
}
if (!jobConfig && globalConfig?.enabled !== true) {
return null;
}
return {
after: clampPositiveInt(jobConfig?.after ?? globalConfig?.after, DEFAULT_FAILURE_ALERT_AFTER),
cooldownMs: clampNonNegativeInt(
jobConfig?.cooldownMs ?? globalConfig?.cooldownMs,
DEFAULT_FAILURE_ALERT_COOLDOWN_MS,
),
channel:
normalizeCronMessageChannel(jobConfig?.channel) ??
normalizeCronMessageChannel(job.delivery?.channel) ??
"last",
to: normalizeTo(jobConfig?.to) ?? normalizeTo(job.delivery?.to),
};
}
function emitFailureAlert(
state: CronServiceState,
params: {
job: CronJob;
error?: string;
consecutiveErrors: number;
channel: CronMessageChannel;
to?: string;
},
) {
const safeJobName = params.job.name || params.job.id;
const truncatedError = (params.error?.trim() || "unknown error").slice(0, 200);
const text = [
`Cron job "${safeJobName}" failed ${params.consecutiveErrors} times`,
`Last error: ${truncatedError}`,
].join("\n");
if (state.deps.sendCronFailureAlert) {
void state.deps
.sendCronFailureAlert({
job: params.job,
text,
channel: params.channel,
to: params.to,
})
.catch((err) => {
state.deps.log.warn(
{ jobId: params.job.id, err: String(err) },
"cron: failure alert delivery failed",
);
});
return;
}
state.deps.enqueueSystemEvent(text, { agentId: params.job.agentId });
if (params.job.wakeMode === "now") {
state.deps.requestHeartbeatNow({ reason: `cron:${params.job.id}:failure-alert` });
}
}
/**
* Apply the result of a job execution to the job's state.
* Handles consecutive error tracking, exponential backoff, one-shot disable,
@@ -181,8 +284,26 @@ export function applyJobResult(
// Track consecutive errors for backoff / auto-disable.
if (result.status === "error") {
job.state.consecutiveErrors = (job.state.consecutiveErrors ?? 0) + 1;
const alertConfig = resolveFailureAlert(state, job);
if (alertConfig && job.state.consecutiveErrors >= alertConfig.after) {
const now = state.deps.nowMs();
const lastAlert = job.state.lastFailureAlertAtMs;
const inCooldown =
typeof lastAlert === "number" && now - lastAlert < Math.max(0, alertConfig.cooldownMs);
if (!inCooldown) {
emitFailureAlert(state, {
job,
error: result.error,
consecutiveErrors: job.state.consecutiveErrors,
channel: alertConfig.channel,
to: alertConfig.to,
});
job.state.lastFailureAlertAtMs = now;
}
}
} else {
job.state.consecutiveErrors = 0;
job.state.lastFailureAlertAtMs = undefined;
}
const shouldDelete =

View File

@@ -56,6 +56,13 @@ export type CronRunOutcome = {
sessionKey?: string;
};
export type CronFailureAlert = {
after?: number;
channel?: CronMessageChannel;
to?: string;
cooldownMs?: number;
};
export type CronPayload =
| { kind: "systemEvent"; text: string }
| {
@@ -102,6 +109,8 @@ export type CronJobState = {
lastDurationMs?: number;
/** Number of consecutive execution errors (reset on success). Used for backoff. */
consecutiveErrors?: number;
/** Last failure alert timestamp (ms since epoch) for cooldown gating. */
lastFailureAlertAtMs?: number;
/** Number of consecutive schedule computation errors. Auto-disables job after threshold. */
scheduleErrorCount?: number;
/** Explicit delivery outcome, separate from execution outcome. */
@@ -128,6 +137,7 @@ export type CronJob = {
wakeMode: CronWakeMode;
payload: CronPayload;
delivery?: CronDelivery;
failureAlert?: CronFailureAlert | false;
state: CronJobState;
};

View File

@@ -187,6 +187,16 @@ export const CronDeliveryPatchSchema = Type.Object(
{ additionalProperties: false },
);
export const CronFailureAlertSchema = Type.Object(
{
after: Type.Optional(Type.Integer({ minimum: 1 })),
channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])),
to: Type.Optional(Type.String()),
cooldownMs: Type.Optional(Type.Integer({ minimum: 0 })),
},
{ additionalProperties: false },
);
export const CronJobStateSchema = Type.Object(
{
nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
@@ -200,6 +210,7 @@ export const CronJobStateSchema = Type.Object(
lastDelivered: Type.Optional(Type.Boolean()),
lastDeliveryStatus: Type.Optional(CronDeliveryStatusSchema),
lastDeliveryError: Type.Optional(Type.String()),
lastFailureAlertAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
},
{ additionalProperties: false },
);
@@ -220,6 +231,7 @@ export const CronJobSchema = Type.Object(
wakeMode: CronWakeModeSchema,
payload: CronPayloadSchema,
delivery: Type.Optional(CronDeliverySchema),
failureAlert: Type.Optional(Type.Union([Type.Literal(false), CronFailureAlertSchema])),
state: CronJobStateSchema,
},
{ additionalProperties: false },
@@ -249,6 +261,7 @@ export const CronAddParamsSchema = Type.Object(
wakeMode: CronWakeModeSchema,
payload: CronPayloadSchema,
delivery: Type.Optional(CronDeliverySchema),
failureAlert: Type.Optional(Type.Union([Type.Literal(false), CronFailureAlertSchema])),
},
{ additionalProperties: false },
);
@@ -262,6 +275,7 @@ export const CronJobPatchSchema = Type.Object(
wakeMode: Type.Optional(CronWakeModeSchema),
payload: Type.Optional(CronPayloadPatchSchema),
delivery: Type.Optional(CronDeliveryPatchSchema),
failureAlert: Type.Optional(Type.Union([Type.Literal(false), CronFailureAlertSchema])),
state: Type.Optional(Type.Partial(CronJobStateSchema)),
},
{ additionalProperties: false },

View File

@@ -1,5 +1,6 @@
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
import type { CliDeps } from "../cli/deps.js";
import { createOutboundSendDeps } from "../cli/outbound-send-deps.js";
import { loadConfig } from "../config/config.js";
import {
canonicalizeMainSessionAlias,
@@ -8,6 +9,7 @@ import {
} from "../config/sessions.js";
import { resolveStorePath } from "../config/sessions/paths.js";
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
import { resolveDeliveryTarget } from "../cron/isolated-agent/delivery-target.js";
import {
appendCronRunLog,
resolveCronRunLogPath,
@@ -21,6 +23,7 @@ import { runHeartbeatOnce } from "../infra/heartbeat-runner.js";
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { fetchWithSsrFGuard } from "../infra/net/fetch-guard.js";
import { SsrFBlockedError } from "../infra/net/ssrf.js";
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js";
import { normalizeAgentId, toAgentStoreSessionKey } from "../routing/session-key.js";
@@ -223,6 +226,25 @@ export function buildGatewayCronService(params: {
lane: "cron",
});
},
sendCronFailureAlert: async ({ job, text, channel, to }) => {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
const target = await resolveDeliveryTarget(runtimeConfig, agentId, {
channel,
to,
});
if (!target.ok) {
throw target.error;
}
await deliverOutboundPayloads({
cfg: runtimeConfig,
channel: target.channel,
to: target.to,
accountId: target.accountId,
threadId: target.threadId,
payloads: [{ text }],
deps: createOutboundSendDeps(params.deps),
});
},
log: getChildLogger({ module: "cron", storePath }),
onEvent: (evt) => {
params.broadcast("cron", evt, { dropIfSlow: true });