diff --git a/CHANGELOG.md b/CHANGELOG.md index ed7195a9b..95a54c4b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ Docs: https://docs.openclaw.ai - Dev tooling: prevent `CLAUDE.md` symlink target regressions by excluding CLAUDE symlink sentinels from `oxfmt` and marking them `-text` in `.gitattributes`, so formatter/EOL normalization cannot reintroduce trailing-newline targets. Thanks @vincentkoc. - Cron: honor `cron.maxConcurrentRuns` in the timer loop so due jobs can execute up to the configured parallelism instead of always running serially. (#11595) Thanks @Takhoffman. - Cron/Isolation: force fresh session IDs for isolated cron runs so `sessionTarget="isolated"` executions never reuse prior run context. (#23470) Thanks @echoVic. +- Cron/Service: execute manual `cron.run` jobs outside the cron lock (while still persisting started/finished state atomically) so `cron.list` and `cron.status` remain responsive during long forced runs. (#23628) Thanks @dsgraves. - Agents/Compaction: restore embedded compaction safeguard/context-pruning extension loading in production by wiring bundled extension factories into the resource loader instead of runtime file-path resolution. (#22349) Thanks @Glucksberg. - Feishu/Media: for inbound video messages that include both `file_key` (video) and `image_key` (thumbnail), prefer `file_key` when downloading media so video attachments are saved instead of silently failing on thumbnail keys. (#23633) - Hooks/Cron: suppress duplicate main-session events for delivered hook turns and mark `SILENT_REPLY_TOKEN` (`NO_REPLY`) early exits as delivered to prevent hook context pollution. (#20678) Thanks @JonathanWorks. diff --git a/src/cron/service.read-ops-nonblocking.test.ts b/src/cron/service.read-ops-nonblocking.test.ts index e6a24957a..1af332c19 100644 --- a/src/cron/service.read-ops-nonblocking.test.ts +++ b/src/cron/service.read-ops-nonblocking.test.ts @@ -162,6 +162,59 @@ describe("CronService read ops while job is running", () => { } }); + it("keeps list and status responsive during manual cron.run execution", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + const isolatedRun = createDeferredIsolatedRun(); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob: isolatedRun.runIsolatedAgentJob, + }); + + try { + await cron.start(); + const job = await cron.add({ + name: "manual run isolation", + enabled: true, + deleteAfterRun: false, + schedule: { + kind: "at", + at: new Date("2030-01-01T00:00:00.000Z").toISOString(), + }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "manual run" }, + delivery: { mode: "none" }, + }); + + const runPromise = cron.run(job.id, "force"); + await isolatedRun.runStarted; + + await expect( + withTimeout(cron.list({ includeDisabled: true }), 300, "cron.list during cron.run"), + ).resolves.toBeTypeOf("object"); + await expect(withTimeout(cron.status(), 300, "cron.status during cron.run")).resolves.toEqual( + expect.objectContaining({ enabled: true, storePath: store.storePath }), + ); + + isolatedRun.completeRun({ status: "ok", summary: "manual done" }); + await expect(runPromise).resolves.toEqual({ ok: true, ran: true }); + + const completed = await cron.list({ includeDisabled: true }); + expect(completed[0]?.state.lastStatus).toBe("ok"); + expect(completed[0]?.state.runningAtMs).toBeUndefined(); + } finally { + cron.stop(); + await store.cleanup(); + } + }); + it("keeps list and status responsive during startup catch-up runs", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 9c71ae4f1..0a60b88f5 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -12,7 +12,15 @@ import { import { locked } from "./locked.js"; import type { CronServiceState } from "./state.js"; import { ensureLoaded, persist, warnIfDisabled } from "./store.js"; -import { armTimer, emit, executeJob, runMissedJobs, stopTimer, wake } from "./timer.js"; +import { + applyJobResult, + armTimer, + emit, + executeJobCore, + runMissedJobs, + stopTimer, + wake, +} from "./timer.js"; async function ensureLoadedForRead(state: CronServiceState) { await ensureLoaded(state, { skipRecompute: true }); @@ -201,7 +209,7 @@ export async function remove(state: CronServiceState, id: string) { } export async function run(state: CronServiceState, id: string, mode?: "due" | "force") { - return await locked(state, async () => { + const prepared = await locked(state, async () => { warnIfDisabled(state, "run"); await ensureLoaded(state, { skipRecompute: true }); const job = findJobOrThrow(state, id); @@ -213,12 +221,82 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f if (!due) { return { ok: true, ran: false, reason: "not-due" as const }; } - await executeJob(state, job, now, { forced: mode === "force" }); + + // Reserve this run under lock, then execute outside lock so read ops + // (`list`, `status`) stay responsive while the run is in progress. + job.state.runningAtMs = now; + job.state.lastError = undefined; + emit(state, { jobId: job.id, action: "started", runAtMs: now }); + const executionJob = JSON.parse(JSON.stringify(job)) as typeof job; + return { + ok: true, + ran: true, + jobId: job.id, + startedAt: now, + executionJob, + } as const; + }); + + if (!prepared.ran) { + return prepared; + } + + let coreResult: + | Awaited> + | { + status: "error"; + error: string; + }; + try { + coreResult = await executeJobCore(state, prepared.executionJob); + } catch (err) { + coreResult = { status: "error", error: String(err) }; + } + const endedAt = state.deps.nowMs(); + + await locked(state, async () => { + await ensureLoaded(state, { skipRecompute: true }); + const job = state.store?.jobs.find((entry) => entry.id === prepared.jobId); + if (!job) { + return; + } + + const shouldDelete = applyJobResult(state, job, { + status: coreResult.status, + error: coreResult.error, + delivered: coreResult.delivered, + startedAt: prepared.startedAt, + endedAt, + }); + + emit(state, { + jobId: job.id, + action: "finished", + status: coreResult.status, + error: coreResult.error, + summary: coreResult.summary, + delivered: coreResult.delivered, + sessionId: coreResult.sessionId, + sessionKey: coreResult.sessionKey, + runAtMs: prepared.startedAt, + durationMs: job.state.lastDurationMs, + nextRunAtMs: job.state.nextRunAtMs, + model: coreResult.model, + provider: coreResult.provider, + usage: coreResult.usage, + }); + + if (shouldDelete && state.store) { + state.store.jobs = state.store.jobs.filter((entry) => entry.id !== job.id); + emit(state, { jobId: job.id, action: "removed" }); + } + recomputeNextRuns(state); await persist(state); armTimer(state); - return { ok: true, ran: true } as const; }); + + return { ok: true, ran: true } as const; } export function wakeNow( diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 8b80dbd90..5b334d3a8 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -29,7 +29,7 @@ const MIN_REFIRE_GAP_MS = 2_000; * on top of the per-provider / per-agent timeouts to prevent one stuck job * from wedging the entire cron lane. */ -const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes +export const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes type TimedCronRunOutcome = CronRunOutcome & CronRunTelemetry & { @@ -68,7 +68,7 @@ function errorBackoffMs(consecutiveErrors: number): number { * Handles consecutive error tracking, exponential backoff, one-shot disable, * and nextRunAtMs computation. Returns `true` if the job should be deleted. */ -function applyJobResult( +export function applyJobResult( state: CronServiceState, job: CronJob, result: { @@ -556,7 +556,7 @@ export async function runDueJobs(state: CronServiceState) { } } -async function executeJobCore( +export async function executeJobCore( state: CronServiceState, job: CronJob, abortSignal?: AbortSignal,