cron: narrow startup replay backoff guard (#35391)
This commit is contained in:
@@ -286,6 +286,7 @@ describe("CronService restart catch-up", () => {
|
||||
nextRunAtMs: Date.parse("2025-12-13T04:10:00.000Z"),
|
||||
lastRunAtMs: Date.parse("2025-12-13T04:01:00.000Z"),
|
||||
lastStatus: "error",
|
||||
consecutiveErrors: 4,
|
||||
},
|
||||
},
|
||||
]);
|
||||
@@ -304,4 +305,50 @@ describe("CronService restart catch-up", () => {
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("replays missed cron slot after restart when error backoff has already elapsed", async () => {
|
||||
vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z"));
|
||||
const store = await makeStorePath();
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
const requestHeartbeatNow = vi.fn();
|
||||
|
||||
await writeStoreJobs(store.storePath, [
|
||||
{
|
||||
id: "restart-backoff-elapsed-replay",
|
||||
name: "backoff elapsed replay",
|
||||
enabled: true,
|
||||
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2025-12-13T04:01:10.000Z"),
|
||||
schedule: { kind: "cron", expr: "1,11,21,31,41,51 4-20 * * *", tz: "UTC" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "replay after backoff elapsed" },
|
||||
state: {
|
||||
// Startup maintenance may already point to a future slot (04:11) even
|
||||
// though 04:01 was missed and the 30s error backoff has elapsed.
|
||||
nextRunAtMs: Date.parse("2025-12-13T04:11:00.000Z"),
|
||||
lastRunAtMs: Date.parse("2025-12-13T03:51:00.000Z"),
|
||||
lastStatus: "error",
|
||||
consecutiveErrors: 1,
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
||||
const cron = createRestartCronService({
|
||||
storePath: store.storePath,
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith(
|
||||
"replay after backoff elapsed",
|
||||
expect.objectContaining({ agentId: undefined }),
|
||||
);
|
||||
expect(requestHeartbeatNow).toHaveBeenCalled();
|
||||
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -741,9 +741,10 @@ function isRunnableJob(params: {
|
||||
typeof next === "number" &&
|
||||
Number.isFinite(next) &&
|
||||
next > nowMs &&
|
||||
job.state.lastStatus === "error"
|
||||
isErrorBackoffPending(job, nowMs)
|
||||
) {
|
||||
// Respect persisted retry backoff windows for recurring jobs on restart.
|
||||
// Respect active retry backoff windows on restart, but allow missed-slot
|
||||
// replay once the backoff window has elapsed.
|
||||
return false;
|
||||
}
|
||||
if (!params.allowCronMissedRunByLastRun || job.schedule.kind !== "cron") {
|
||||
@@ -766,6 +767,22 @@ function isRunnableJob(params: {
|
||||
return previousRunAtMs > lastRunAtMs;
|
||||
}
|
||||
|
||||
function isErrorBackoffPending(job: CronJob, nowMs: number): boolean {
|
||||
if (job.schedule.kind === "at" || job.state.lastStatus !== "error") {
|
||||
return false;
|
||||
}
|
||||
const lastRunAtMs = job.state.lastRunAtMs;
|
||||
if (typeof lastRunAtMs !== "number" || !Number.isFinite(lastRunAtMs)) {
|
||||
return false;
|
||||
}
|
||||
const consecutiveErrorsRaw = job.state.consecutiveErrors;
|
||||
const consecutiveErrors =
|
||||
typeof consecutiveErrorsRaw === "number" && Number.isFinite(consecutiveErrorsRaw)
|
||||
? Math.max(1, Math.floor(consecutiveErrorsRaw))
|
||||
: 1;
|
||||
return nowMs < lastRunAtMs + errorBackoffMs(consecutiveErrors);
|
||||
}
|
||||
|
||||
function collectRunnableJobs(
|
||||
state: CronServiceState,
|
||||
nowMs: number,
|
||||
|
||||
Reference in New Issue
Block a user