From 6b400eca5ce30b283fb6ff854f67a5af4d82cca7 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 14 Feb 2026 21:44:30 +0000 Subject: [PATCH] refactor(cron): share job tick state normalization --- src/cron/service/jobs.ts | 83 +++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index c8fcdce43..71a11af7b 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -90,6 +90,43 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und /** Maximum consecutive schedule errors before auto-disabling a job. */ const MAX_SCHEDULE_ERRORS = 3; +function normalizeJobTickState(params: { state: CronServiceState; job: CronJob; nowMs: number }): { + changed: boolean; + skip: boolean; +} { + const { state, job, nowMs } = params; + let changed = false; + + if (!job.state) { + job.state = {}; + changed = true; + } + + if (!job.enabled) { + if (job.state.nextRunAtMs !== undefined) { + job.state.nextRunAtMs = undefined; + changed = true; + } + if (job.state.runningAtMs !== undefined) { + job.state.runningAtMs = undefined; + changed = true; + } + return { changed, skip: true }; + } + + const runningAt = job.state.runningAtMs; + if (typeof runningAt === "number" && nowMs - runningAt > STUCK_RUN_MS) { + state.deps.log.warn( + { jobId: job.id, runningAtMs: runningAt }, + "cron: clearing stuck running marker", + ); + job.state.runningAtMs = undefined; + changed = true; + } + + return { changed, skip: false }; +} + export function recomputeNextRuns(state: CronServiceState): boolean { if (!state.store) { return false; @@ -97,30 +134,13 @@ export function recomputeNextRuns(state: CronServiceState): boolean { let changed = false; const now = state.deps.nowMs(); for (const job of state.store.jobs) { - if (!job.state) { - job.state = {}; + const tick = normalizeJobTickState({ state, job, nowMs: now }); + if (tick.changed) { changed = true; } - if (!job.enabled) { - if (job.state.nextRunAtMs !== undefined) { - job.state.nextRunAtMs = undefined; - changed = true; - } - if (job.state.runningAtMs !== undefined) { - job.state.runningAtMs = undefined; - changed = true; - } + if (tick.skip) { continue; } - const runningAt = job.state.runningAtMs; - if (typeof runningAt === "number" && now - runningAt > STUCK_RUN_MS) { - state.deps.log.warn( - { jobId: job.id, runningAtMs: runningAt }, - "cron: clearing stuck running marker", - ); - job.state.runningAtMs = undefined; - changed = true; - } // Only recompute if nextRunAtMs is missing or already past-due. // Preserving a still-future nextRunAtMs avoids accidentally advancing // a job that hasn't fired yet (e.g. during restart recovery). @@ -177,30 +197,13 @@ export function recomputeNextRunsForMaintenance(state: CronServiceState): boolea let changed = false; const now = state.deps.nowMs(); for (const job of state.store.jobs) { - if (!job.state) { - job.state = {}; + const tick = normalizeJobTickState({ state, job, nowMs: now }); + if (tick.changed) { changed = true; } - if (!job.enabled) { - if (job.state.nextRunAtMs !== undefined) { - job.state.nextRunAtMs = undefined; - changed = true; - } - if (job.state.runningAtMs !== undefined) { - job.state.runningAtMs = undefined; - changed = true; - } + if (tick.skip) { continue; } - const runningAt = job.state.runningAtMs; - if (typeof runningAt === "number" && now - runningAt > STUCK_RUN_MS) { - state.deps.log.warn( - { jobId: job.id, runningAtMs: runningAt }, - "cron: clearing stuck running marker", - ); - job.state.runningAtMs = undefined; - changed = true; - } // Only compute missing nextRunAtMs, do NOT recompute existing ones. // If a job was past-due but not found by findDueJobs, recomputing would // cause it to be silently skipped.