242 lines
7.6 KiB
TypeScript
242 lines
7.6 KiB
TypeScript
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
|
import type { CronJob } from "../types.js";
|
|
import { computeJobNextRunAtMs, nextWakeAtMs, resolveJobPayloadTextForMain } from "./jobs.js";
|
|
import { locked } from "./locked.js";
|
|
import type { CronEvent, CronServiceState } from "./state.js";
|
|
import { ensureLoaded, persist } from "./store.js";
|
|
|
|
const MAX_TIMEOUT_MS = 2 ** 31 - 1;
|
|
|
|
export function armTimer(state: CronServiceState) {
|
|
if (state.timer) clearTimeout(state.timer);
|
|
state.timer = null;
|
|
if (!state.deps.cronEnabled) return;
|
|
const nextAt = nextWakeAtMs(state);
|
|
if (!nextAt) return;
|
|
const delay = Math.max(nextAt - state.deps.nowMs(), 0);
|
|
// Avoid TimeoutOverflowWarning when a job is far in the future.
|
|
const clampedDelay = Math.min(delay, MAX_TIMEOUT_MS);
|
|
state.timer = setTimeout(() => {
|
|
void onTimer(state).catch((err) => {
|
|
state.deps.log.error({ err: String(err) }, "cron: timer tick failed");
|
|
});
|
|
}, clampedDelay);
|
|
state.timer.unref?.();
|
|
}
|
|
|
|
export async function onTimer(state: CronServiceState) {
|
|
if (state.running) return;
|
|
state.running = true;
|
|
try {
|
|
await locked(state, async () => {
|
|
await ensureLoaded(state);
|
|
await runDueJobs(state);
|
|
await persist(state);
|
|
armTimer(state);
|
|
});
|
|
} finally {
|
|
state.running = false;
|
|
}
|
|
}
|
|
|
|
export async function runDueJobs(state: CronServiceState) {
|
|
if (!state.store) return;
|
|
const now = state.deps.nowMs();
|
|
const due = state.store.jobs.filter((j) => {
|
|
if (!j.enabled) return false;
|
|
if (typeof j.state.runningAtMs === "number") return false;
|
|
const next = j.state.nextRunAtMs;
|
|
return typeof next === "number" && now >= next;
|
|
});
|
|
for (const job of due) {
|
|
await executeJob(state, job, now, { forced: false });
|
|
}
|
|
}
|
|
|
|
export async function executeJob(
|
|
state: CronServiceState,
|
|
job: CronJob,
|
|
nowMs: number,
|
|
opts: { forced: boolean },
|
|
) {
|
|
const startedAt = state.deps.nowMs();
|
|
job.state.runningAtMs = startedAt;
|
|
job.state.lastError = undefined;
|
|
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
|
|
|
|
let deleted = false;
|
|
|
|
const finish = async (
|
|
status: "ok" | "error" | "skipped",
|
|
err?: string,
|
|
summary?: string,
|
|
outputText?: string,
|
|
) => {
|
|
const endedAt = state.deps.nowMs();
|
|
job.state.runningAtMs = undefined;
|
|
job.state.lastRunAtMs = startedAt;
|
|
job.state.lastStatus = status;
|
|
job.state.lastDurationMs = Math.max(0, endedAt - startedAt);
|
|
job.state.lastError = err;
|
|
|
|
const shouldDelete =
|
|
job.schedule.kind === "at" && status === "ok" && job.deleteAfterRun === true;
|
|
|
|
if (!shouldDelete) {
|
|
if (job.schedule.kind === "at" && status === "ok") {
|
|
// One-shot job completed successfully; disable it.
|
|
job.enabled = false;
|
|
job.state.nextRunAtMs = undefined;
|
|
} else if (job.enabled) {
|
|
job.state.nextRunAtMs = computeJobNextRunAtMs(job, endedAt);
|
|
} else {
|
|
job.state.nextRunAtMs = undefined;
|
|
}
|
|
}
|
|
|
|
emit(state, {
|
|
jobId: job.id,
|
|
action: "finished",
|
|
status,
|
|
error: err,
|
|
summary,
|
|
runAtMs: startedAt,
|
|
durationMs: job.state.lastDurationMs,
|
|
nextRunAtMs: job.state.nextRunAtMs,
|
|
});
|
|
|
|
if (shouldDelete && state.store) {
|
|
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
|
|
deleted = true;
|
|
emit(state, { jobId: job.id, action: "removed" });
|
|
}
|
|
|
|
if (job.sessionTarget === "isolated") {
|
|
const prefix = job.isolation?.postToMainPrefix?.trim() || "Cron";
|
|
const mode = job.isolation?.postToMainMode ?? "summary";
|
|
|
|
let body = (summary ?? err ?? status).trim();
|
|
if (mode === "full") {
|
|
// Prefer full agent output if available; fall back to summary.
|
|
const maxCharsRaw = job.isolation?.postToMainMaxChars;
|
|
const maxChars = Number.isFinite(maxCharsRaw) ? Math.max(0, maxCharsRaw as number) : 8000;
|
|
const fullText = (outputText ?? "").trim();
|
|
if (fullText) {
|
|
body = fullText.length > maxChars ? `${fullText.slice(0, maxChars)}…` : fullText;
|
|
}
|
|
}
|
|
|
|
const statusPrefix = status === "ok" ? prefix : `${prefix} (${status})`;
|
|
state.deps.enqueueSystemEvent(`${statusPrefix}: ${body}`, {
|
|
agentId: job.agentId,
|
|
});
|
|
if (job.wakeMode === "now") {
|
|
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}:post` });
|
|
}
|
|
}
|
|
};
|
|
|
|
try {
|
|
if (job.sessionTarget === "main") {
|
|
const text = resolveJobPayloadTextForMain(job);
|
|
if (!text) {
|
|
const kind = job.payload.kind;
|
|
await finish(
|
|
"skipped",
|
|
kind === "systemEvent"
|
|
? "main job requires non-empty systemEvent text"
|
|
: 'main job requires payload.kind="systemEvent"',
|
|
);
|
|
return;
|
|
}
|
|
state.deps.enqueueSystemEvent(text, { agentId: job.agentId });
|
|
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
|
|
const reason = `cron:${job.id}`;
|
|
const delay = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
|
|
const maxWaitMs = 2 * 60_000;
|
|
const waitStartedAt = state.deps.nowMs();
|
|
|
|
let heartbeatResult: HeartbeatRunResult;
|
|
for (;;) {
|
|
heartbeatResult = await state.deps.runHeartbeatOnce({ reason });
|
|
if (
|
|
heartbeatResult.status !== "skipped" ||
|
|
heartbeatResult.reason !== "requests-in-flight"
|
|
) {
|
|
break;
|
|
}
|
|
if (state.deps.nowMs() - waitStartedAt > maxWaitMs) {
|
|
heartbeatResult = {
|
|
status: "skipped",
|
|
reason: "timeout waiting for main lane to become idle",
|
|
};
|
|
break;
|
|
}
|
|
await delay(250);
|
|
}
|
|
|
|
if (heartbeatResult.status === "ran") {
|
|
await finish("ok", undefined, text);
|
|
} else if (heartbeatResult.status === "skipped") {
|
|
await finish("skipped", heartbeatResult.reason, text);
|
|
} else {
|
|
await finish("error", heartbeatResult.reason, text);
|
|
}
|
|
} else {
|
|
// wakeMode is "next-heartbeat" or runHeartbeatOnce not available
|
|
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
|
|
await finish("ok", undefined, text);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (job.payload.kind !== "agentTurn") {
|
|
await finish("skipped", "isolated job requires payload.kind=agentTurn");
|
|
return;
|
|
}
|
|
|
|
const res = await state.deps.runIsolatedAgentJob({
|
|
job,
|
|
message: job.payload.message,
|
|
});
|
|
if (res.status === "ok") await finish("ok", undefined, res.summary, res.outputText);
|
|
else if (res.status === "skipped")
|
|
await finish("skipped", undefined, res.summary, res.outputText);
|
|
else await finish("error", res.error ?? "cron job failed", res.summary, res.outputText);
|
|
} catch (err) {
|
|
await finish("error", String(err));
|
|
} finally {
|
|
job.updatedAtMs = nowMs;
|
|
if (!opts.forced && job.enabled && !deleted) {
|
|
// Keep nextRunAtMs in sync in case the schedule advanced during a long run.
|
|
job.state.nextRunAtMs = computeJobNextRunAtMs(job, state.deps.nowMs());
|
|
}
|
|
}
|
|
}
|
|
|
|
export function wake(
|
|
state: CronServiceState,
|
|
opts: { mode: "now" | "next-heartbeat"; text: string },
|
|
) {
|
|
const text = opts.text.trim();
|
|
if (!text) return { ok: false } as const;
|
|
state.deps.enqueueSystemEvent(text);
|
|
if (opts.mode === "now") {
|
|
state.deps.requestHeartbeatNow({ reason: "wake" });
|
|
}
|
|
return { ok: true } as const;
|
|
}
|
|
|
|
export function stopTimer(state: CronServiceState) {
|
|
if (state.timer) clearTimeout(state.timer);
|
|
state.timer = null;
|
|
}
|
|
|
|
export function emit(state: CronServiceState, evt: CronEvent) {
|
|
try {
|
|
state.deps.onEvent?.(evt);
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|