From 313e2f2e85eac90c89f6fae47cf22a93a235c8f0 Mon Sep 17 00:00:00 2001 From: Igor Markelov Date: Fri, 6 Feb 2026 07:43:37 +0800 Subject: [PATCH] fix(cron): prevent recomputeNextRuns from skipping due jobs in onTimer (#9823) * fix(cron): prevent recomputeNextRuns from skipping due jobs in onTimer ensureLoaded(forceReload) called recomputeNextRuns before runDueJobs, which recalculated nextRunAtMs to a strictly future time. Since setTimeout always fires a few ms late, the due check (now >= nextRunAtMs) always failed and every/cron jobs never executed. Fixes #9788. * docs: add changelog entry for cron timer race fix (#9823) (thanks @pycckuu) --------- Co-authored-by: Tyler Yust --- CHANGELOG.md | 1 + src/cron/service.every-jobs-fire.test.ts | 127 +++++++++++++++++++++++ src/cron/service/store.ts | 15 ++- src/cron/service/timer.ts | 13 ++- 4 files changed, 151 insertions(+), 5 deletions(-) create mode 100644 src/cron/service.every-jobs-fire.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 9661c7453..8e2e1e0f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ Docs: https://docs.openclaw.ai - Voice call: add regression coverage for anonymous inbound caller IDs with allowlist policy. (#8104) Thanks @victormier. - Cron: accept epoch timestamps and 0ms durations in CLI `--at` parsing. - Cron: reload store data when the store file is recreated or mtime changes. +- Cron: prevent `recomputeNextRuns` from skipping due jobs when timer fires late by reordering `onTimer` flow. (#9823, fixes #9788) Thanks @pycckuu. - Cron: deliver announce runs directly, honor delivery mode, and respect wakeMode for summaries. (#8540) Thanks @tyler6204. - Cron: correct announce delivery inference for thread session keys and null delivery inputs. (#9733) Thanks @tyler6204. - Telegram: include forward_from_chat metadata in forwarded messages and harden cron delivery target checks. (#8392) Thanks @Glucksberg. diff --git a/src/cron/service.every-jobs-fire.test.ts b/src/cron/service.every-jobs-fire.test.ts new file mode 100644 index 000000000..a6a2bab80 --- /dev/null +++ b/src/cron/service.every-jobs-fire.test.ts @@ -0,0 +1,127 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { CronService } from "./service.js"; + +const noopLogger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-")); + return { + storePath: path.join(dir, "cron", "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("CronService interval/cron jobs fire on time", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z")); + noopLogger.debug.mockClear(); + noopLogger.info.mockClear(); + noopLogger.warn.mockClear(); + noopLogger.error.mockClear(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("fires an every-type main job when the timer fires a few ms late", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + const job = await cron.add({ + name: "every 10s check", + enabled: true, + schedule: { kind: "every", everyMs: 10_000 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + }); + + const firstDueAt = job.state.nextRunAtMs!; + expect(firstDueAt).toBe(Date.parse("2025-12-13T00:00:00.000Z") + 10_000); + + // Simulate setTimeout firing 5ms late (the race condition). + vi.setSystemTime(new Date(firstDueAt + 5)); + await vi.runOnlyPendingTimersAsync(); + + // Wait for the async onTimer to complete via the lock queue. + const jobs = await cron.list(); + const updated = jobs.find((j) => j.id === job.id); + + expect(enqueueSystemEvent).toHaveBeenCalledWith("tick", { agentId: undefined }); + expect(updated?.state.lastStatus).toBe("ok"); + // nextRunAtMs must advance by at least one full interval past the due time. + expect(updated?.state.nextRunAtMs).toBeGreaterThanOrEqual(firstDueAt + 10_000); + + cron.stop(); + await store.cleanup(); + }); + + it("fires a cron-expression job when the timer fires a few ms late", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + // Set time to just before a minute boundary. + vi.setSystemTime(new Date("2025-12-13T00:00:59.000Z")); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + const job = await cron.add({ + name: "every minute check", + enabled: true, + schedule: { kind: "cron", expr: "* * * * *" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "cron-tick" }, + }); + + const firstDueAt = job.state.nextRunAtMs!; + + // Simulate setTimeout firing 5ms late. + vi.setSystemTime(new Date(firstDueAt + 5)); + await vi.runOnlyPendingTimersAsync(); + + // Wait for the async onTimer to complete via the lock queue. + const jobs = await cron.list(); + const updated = jobs.find((j) => j.id === job.id); + + expect(enqueueSystemEvent).toHaveBeenCalledWith("cron-tick", { agentId: undefined }); + expect(updated?.state.lastStatus).toBe("ok"); + // nextRunAtMs should be the next whole-minute boundary (60s later). + expect(updated?.state.nextRunAtMs).toBe(firstDueAt + 60_000); + + cron.stop(); + await store.cleanup(); + }); +}); diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index 659178d75..51aca4165 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -126,7 +126,15 @@ async function getFileMtimeMs(path: string): Promise { } } -export async function ensureLoaded(state: CronServiceState, opts?: { forceReload?: boolean }) { +export async function ensureLoaded( + state: CronServiceState, + opts?: { + forceReload?: boolean; + /** Skip recomputing nextRunAtMs after load so the caller can run due + * jobs against the persisted values first (see onTimer). */ + skipRecompute?: boolean; + }, +) { // Fast path: store is already in memory. Other callers (add, list, run, …) // trust the in-memory copy to avoid a stat syscall on every operation. if (state.store && !opts?.forceReload) { @@ -255,8 +263,9 @@ export async function ensureLoaded(state: CronServiceState, opts?: { forceReload state.storeLoadedAtMs = state.deps.nowMs(); state.storeFileMtimeMs = fileMtimeMs; - // Recompute next runs after loading to ensure accuracy - recomputeNextRuns(state); + if (!opts?.skipRecompute) { + recomputeNextRuns(state); + } if (mutated) { await persist(state); diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 41ee103b9..b85ee564e 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -1,7 +1,12 @@ import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import type { CronJob } from "../types.js"; import type { CronEvent, CronServiceState } from "./state.js"; -import { computeJobNextRunAtMs, nextWakeAtMs, resolveJobPayloadTextForMain } from "./jobs.js"; +import { + computeJobNextRunAtMs, + nextWakeAtMs, + recomputeNextRuns, + resolveJobPayloadTextForMain, +} from "./jobs.js"; import { locked } from "./locked.js"; import { ensureLoaded, persist } from "./store.js"; @@ -36,8 +41,12 @@ export async function onTimer(state: CronServiceState) { state.running = true; try { await locked(state, async () => { - await ensureLoaded(state, { forceReload: true }); + // Reload persisted due-times without recomputing so runDueJobs sees + // the original nextRunAtMs values. Recomputing first would advance + // every/cron slots past the current tick when the timer fires late (#9788). + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); await runDueJobs(state); + recomputeNextRuns(state); await persist(state); armTimer(state); });