diff --git a/src/cron/service.persists-delivered-status.test.ts b/src/cron/service.persists-delivered-status.test.ts index ea9712aca..4af3dd575 100644 --- a/src/cron/service.persists-delivered-status.test.ts +++ b/src/cron/service.persists-delivered-status.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it, vi } from "vitest"; import { CronService } from "./service.js"; import { + createFinishedBarrier, createStartedCronServiceWithFinishedBarrier, createCronStoreHarness, createNoopLogger, @@ -11,107 +12,125 @@ const noopLogger = createNoopLogger(); const { makeStorePath } = createCronStoreHarness(); installCronTestHooks({ logger: noopLogger }); +type CronAddInput = Parameters[0]; + +function buildIsolatedAgentTurnJob(name: string): CronAddInput { + return { + name, + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "test" }, + delivery: { mode: "none" }, + }; +} + +function buildMainSessionSystemEventJob(name: string): CronAddInput { + return { + name, + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + }; +} + +function createIsolatedCronWithFinishedBarrier(params: { + storePath: string; + delivered?: boolean; + onFinished?: (evt: { jobId: string; delivered?: boolean }) => void; +}) { + const finished = createFinishedBarrier(); + const cron = new CronService({ + storePath: params.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ + status: "ok" as const, + summary: "done", + ...(params.delivered === undefined ? {} : { delivered: params.delivered }), + })), + onEvent: (evt) => { + if (evt.action === "finished") { + params.onFinished?.({ jobId: evt.jobId, delivered: evt.delivered }); + } + finished.onEvent(evt); + }, + }); + return { cron, finished }; +} + +async function runSingleJobAndReadState(params: { + cron: CronService; + finished: ReturnType; + job: CronAddInput; +}) { + const job = await params.cron.add(params.job); + vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5)); + await vi.runOnlyPendingTimersAsync(); + await params.finished.waitForOk(job.id); + + const jobs = await params.cron.list({ includeDisabled: true }); + return { job, updated: jobs.find((entry) => entry.id === job.id) }; +} + describe("CronService persists delivered status", () => { it("persists lastDelivered=true when isolated job reports delivered", async () => { const store = await makeStorePath(); - const finished = { - resolvers: new Map void>(), - waitForOk(jobId: string) { - return new Promise((resolve) => { - this.resolvers.set(jobId, resolve); - }); - }, - }; - - const cron = new CronService({ + const { cron, finished } = createIsolatedCronWithFinishedBarrier({ storePath: store.storePath, - cronEnabled: true, - log: noopLogger, - enqueueSystemEvent: vi.fn(), - requestHeartbeatNow: vi.fn(), - runIsolatedAgentJob: vi.fn(async () => ({ - status: "ok" as const, - summary: "done", - delivered: true, - })), - onEvent: (evt) => { - if (evt.action === "finished" && evt.status === "ok") { - finished.resolvers.get(evt.jobId)?.(); - finished.resolvers.delete(evt.jobId); - } - }, + delivered: true, }); await cron.start(); - const job = await cron.add({ - name: "delivered-true", - enabled: true, - schedule: { kind: "every", everyMs: 60_000 }, - sessionTarget: "isolated", - wakeMode: "next-heartbeat", - payload: { kind: "agentTurn", message: "test" }, - delivery: { mode: "none" }, + const { updated } = await runSingleJobAndReadState({ + cron, + finished, + job: buildIsolatedAgentTurnJob("delivered-true"), }); - vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5)); - await vi.runOnlyPendingTimersAsync(); - await finished.waitForOk(job.id); - - const jobs = await cron.list({ includeDisabled: true }); - const updated = jobs.find((j) => j.id === job.id); - expect(updated?.state.lastStatus).toBe("ok"); expect(updated?.state.lastDelivered).toBe(true); cron.stop(); }); - it("persists lastDelivered=undefined when isolated job does not deliver", async () => { + it("persists lastDelivered=false when isolated job explicitly reports not delivered", async () => { const store = await makeStorePath(); - const finished = { - resolvers: new Map void>(), - waitForOk(jobId: string) { - return new Promise((resolve) => { - this.resolvers.set(jobId, resolve); - }); - }, - }; - - const cron = new CronService({ + const { cron, finished } = createIsolatedCronWithFinishedBarrier({ storePath: store.storePath, - cronEnabled: true, - log: noopLogger, - enqueueSystemEvent: vi.fn(), - requestHeartbeatNow: vi.fn(), - runIsolatedAgentJob: vi.fn(async () => ({ - status: "ok" as const, - summary: "done", - })), - onEvent: (evt) => { - if (evt.action === "finished" && evt.status === "ok") { - finished.resolvers.get(evt.jobId)?.(); - finished.resolvers.delete(evt.jobId); - } - }, + delivered: false, }); await cron.start(); - const job = await cron.add({ - name: "no-delivery", - enabled: true, - schedule: { kind: "every", everyMs: 60_000 }, - sessionTarget: "isolated", - wakeMode: "next-heartbeat", - payload: { kind: "agentTurn", message: "test" }, - delivery: { mode: "none" }, + const { updated } = await runSingleJobAndReadState({ + cron, + finished, + job: buildIsolatedAgentTurnJob("delivered-false"), }); - vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5)); - await vi.runOnlyPendingTimersAsync(); - await finished.waitForOk(job.id); + expect(updated?.state.lastStatus).toBe("ok"); + expect(updated?.state.lastDelivered).toBe(false); - const jobs = await cron.list({ includeDisabled: true }); - const updated = jobs.find((j) => j.id === job.id); + cron.stop(); + }); + + it("persists lastDelivered=undefined when isolated job does not deliver", async () => { + const store = await makeStorePath(); + const { cron, finished } = createIsolatedCronWithFinishedBarrier({ + storePath: store.storePath, + }); + + await cron.start(); + const { updated } = await runSingleJobAndReadState({ + cron, + finished, + job: buildIsolatedAgentTurnJob("no-delivery"), + }); expect(updated?.state.lastStatus).toBe("ok"); expect(updated?.state.lastDelivered).toBeUndefined(); @@ -127,22 +146,12 @@ describe("CronService persists delivered status", () => { }); await cron.start(); - const job = await cron.add({ - name: "main-session", - enabled: true, - schedule: { kind: "every", everyMs: 60_000 }, - sessionTarget: "main", - wakeMode: "next-heartbeat", - payload: { kind: "systemEvent", text: "tick" }, + const { updated } = await runSingleJobAndReadState({ + cron, + finished, + job: buildMainSessionSystemEventJob("main-session"), }); - vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5)); - await vi.runOnlyPendingTimersAsync(); - await finished.waitForOk(job.id); - - const jobs = await cron.list({ includeDisabled: true }); - const updated = jobs.find((j) => j.id === job.id); - expect(updated?.state.lastStatus).toBe("ok"); expect(updated?.state.lastDelivered).toBeUndefined(); expect(enqueueSystemEvent).toHaveBeenCalled(); @@ -153,58 +162,23 @@ describe("CronService persists delivered status", () => { it("emits delivered in the finished event", async () => { const store = await makeStorePath(); let capturedEvent: { jobId: string; delivered?: boolean } | undefined; - const finished = { - resolvers: new Map void>(), - waitForOk(jobId: string) { - return new Promise((resolve) => { - this.resolvers.set(jobId, resolve); - }); - }, - }; - - const cron = new CronService({ + const { cron, finished } = createIsolatedCronWithFinishedBarrier({ storePath: store.storePath, - cronEnabled: true, - log: noopLogger, - enqueueSystemEvent: vi.fn(), - requestHeartbeatNow: vi.fn(), - runIsolatedAgentJob: vi.fn(async () => ({ - status: "ok" as const, - summary: "done", - delivered: true, - })), - onEvent: (evt) => { - if (evt.action === "finished") { - capturedEvent = { jobId: evt.jobId, delivered: evt.delivered }; - if (evt.status === "ok") { - finished.resolvers.get(evt.jobId)?.(); - finished.resolvers.delete(evt.jobId); - } - } + delivered: true, + onFinished: (evt) => { + capturedEvent = evt; }, }); await cron.start(); - const job = await cron.add({ - name: "event-test", - enabled: true, - schedule: { kind: "every", everyMs: 60_000 }, - sessionTarget: "isolated", - wakeMode: "next-heartbeat", - payload: { kind: "agentTurn", message: "test" }, - delivery: { mode: "none" }, + await runSingleJobAndReadState({ + cron, + finished, + job: buildIsolatedAgentTurnJob("event-test"), }); - vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5)); - await vi.runOnlyPendingTimersAsync(); - await finished.waitForOk(job.id); - expect(capturedEvent).toBeDefined(); expect(capturedEvent?.delivered).toBe(true); - - // Flush pending store writes before stopping so the temp file is released - // (prevents ENOTEMPTY on Windows when afterAll removes the fixture dir). - await cron.list({ includeDisabled: true }); cron.stop(); }); });