- Added support for new delivery modes in cron jobs: `announce`, `deliver`, and `none`. - Updated documentation to reflect changes in delivery options and usage examples. - Enhanced the cron job schema to include delivery configuration. - Refactored related CLI commands and UI components to accommodate the new delivery settings. - Improved handling of legacy delivery fields for backward compatibility. This update allows users to choose how output from isolated jobs is delivered, enhancing flexibility in job management.
263 lines
7.8 KiB
TypeScript
263 lines
7.8 KiB
TypeScript
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
|
import type { CronJob } from "../types.js";
|
|
import type { CronEvent, CronServiceState } from "./state.js";
|
|
import { computeJobNextRunAtMs, nextWakeAtMs, resolveJobPayloadTextForMain } from "./jobs.js";
|
|
import { locked } from "./locked.js";
|
|
import { ensureLoaded, persist } from "./store.js";
|
|
|
|
const MAX_TIMEOUT_MS = 2 ** 31 - 1;
|
|
|
|
export function armTimer(state: CronServiceState) {
|
|
if (state.timer) {
|
|
clearTimeout(state.timer);
|
|
}
|
|
state.timer = null;
|
|
if (!state.deps.cronEnabled) {
|
|
return;
|
|
}
|
|
const nextAt = nextWakeAtMs(state);
|
|
if (!nextAt) {
|
|
return;
|
|
}
|
|
const delay = Math.max(nextAt - state.deps.nowMs(), 0);
|
|
// Avoid TimeoutOverflowWarning when a job is far in the future.
|
|
const clampedDelay = Math.min(delay, MAX_TIMEOUT_MS);
|
|
state.timer = setTimeout(() => {
|
|
void onTimer(state).catch((err) => {
|
|
state.deps.log.error({ err: String(err) }, "cron: timer tick failed");
|
|
});
|
|
}, clampedDelay);
|
|
state.timer.unref?.();
|
|
}
|
|
|
|
export async function onTimer(state: CronServiceState) {
|
|
if (state.running) {
|
|
return;
|
|
}
|
|
state.running = true;
|
|
try {
|
|
await locked(state, async () => {
|
|
await ensureLoaded(state);
|
|
await runDueJobs(state);
|
|
await persist(state);
|
|
armTimer(state);
|
|
});
|
|
} finally {
|
|
state.running = false;
|
|
}
|
|
}
|
|
|
|
export async function runDueJobs(state: CronServiceState) {
|
|
if (!state.store) {
|
|
return;
|
|
}
|
|
const now = state.deps.nowMs();
|
|
const due = state.store.jobs.filter((j) => {
|
|
if (!j.enabled) {
|
|
return false;
|
|
}
|
|
if (typeof j.state.runningAtMs === "number") {
|
|
return false;
|
|
}
|
|
const next = j.state.nextRunAtMs;
|
|
return typeof next === "number" && now >= next;
|
|
});
|
|
for (const job of due) {
|
|
await executeJob(state, job, now, { forced: false });
|
|
}
|
|
}
|
|
|
|
export async function executeJob(
|
|
state: CronServiceState,
|
|
job: CronJob,
|
|
nowMs: number,
|
|
opts: { forced: boolean },
|
|
) {
|
|
const startedAt = state.deps.nowMs();
|
|
job.state.runningAtMs = startedAt;
|
|
job.state.lastError = undefined;
|
|
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
|
|
|
|
let deleted = false;
|
|
|
|
const finish = async (
|
|
status: "ok" | "error" | "skipped",
|
|
err?: string,
|
|
summary?: string,
|
|
outputText?: string,
|
|
) => {
|
|
const endedAt = state.deps.nowMs();
|
|
job.state.runningAtMs = undefined;
|
|
job.state.lastRunAtMs = startedAt;
|
|
job.state.lastStatus = status;
|
|
job.state.lastDurationMs = Math.max(0, endedAt - startedAt);
|
|
job.state.lastError = err;
|
|
|
|
const shouldDelete =
|
|
job.schedule.kind === "at" && status === "ok" && job.deleteAfterRun === true;
|
|
|
|
if (!shouldDelete) {
|
|
if (job.schedule.kind === "at" && status === "ok") {
|
|
// One-shot job completed successfully; disable it.
|
|
job.enabled = false;
|
|
job.state.nextRunAtMs = undefined;
|
|
} else if (job.enabled) {
|
|
job.state.nextRunAtMs = computeJobNextRunAtMs(job, endedAt);
|
|
} else {
|
|
job.state.nextRunAtMs = undefined;
|
|
}
|
|
}
|
|
|
|
emit(state, {
|
|
jobId: job.id,
|
|
action: "finished",
|
|
status,
|
|
error: err,
|
|
summary,
|
|
runAtMs: startedAt,
|
|
durationMs: job.state.lastDurationMs,
|
|
nextRunAtMs: job.state.nextRunAtMs,
|
|
});
|
|
|
|
if (shouldDelete && state.store) {
|
|
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
|
|
deleted = true;
|
|
emit(state, { jobId: job.id, action: "removed" });
|
|
}
|
|
|
|
if (job.sessionTarget === "isolated" && !job.delivery) {
|
|
const prefix = job.isolation?.postToMainPrefix?.trim() || "Cron";
|
|
const mode = job.isolation?.postToMainMode ?? "summary";
|
|
|
|
let body = (summary ?? err ?? status).trim();
|
|
if (mode === "full") {
|
|
// Prefer full agent output if available; fall back to summary.
|
|
const maxCharsRaw = job.isolation?.postToMainMaxChars;
|
|
const maxChars = Number.isFinite(maxCharsRaw) ? Math.max(0, maxCharsRaw as number) : 8000;
|
|
const fullText = (outputText ?? "").trim();
|
|
if (fullText) {
|
|
body = fullText.length > maxChars ? `${fullText.slice(0, maxChars)}…` : fullText;
|
|
}
|
|
}
|
|
|
|
const statusPrefix = status === "ok" ? prefix : `${prefix} (${status})`;
|
|
state.deps.enqueueSystemEvent(`${statusPrefix}: ${body}`, {
|
|
agentId: job.agentId,
|
|
});
|
|
if (job.wakeMode === "now") {
|
|
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}:post` });
|
|
}
|
|
}
|
|
};
|
|
|
|
try {
|
|
if (job.sessionTarget === "main") {
|
|
const text = resolveJobPayloadTextForMain(job);
|
|
if (!text) {
|
|
const kind = job.payload.kind;
|
|
await finish(
|
|
"skipped",
|
|
kind === "systemEvent"
|
|
? "main job requires non-empty systemEvent text"
|
|
: 'main job requires payload.kind="systemEvent"',
|
|
);
|
|
return;
|
|
}
|
|
state.deps.enqueueSystemEvent(text, { agentId: job.agentId });
|
|
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
|
|
const reason = `cron:${job.id}`;
|
|
const delay = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
|
|
const maxWaitMs = 2 * 60_000;
|
|
const waitStartedAt = state.deps.nowMs();
|
|
|
|
let heartbeatResult: HeartbeatRunResult;
|
|
for (;;) {
|
|
heartbeatResult = await state.deps.runHeartbeatOnce({ reason });
|
|
if (
|
|
heartbeatResult.status !== "skipped" ||
|
|
heartbeatResult.reason !== "requests-in-flight"
|
|
) {
|
|
break;
|
|
}
|
|
if (state.deps.nowMs() - waitStartedAt > maxWaitMs) {
|
|
heartbeatResult = {
|
|
status: "skipped",
|
|
reason: "timeout waiting for main lane to become idle",
|
|
};
|
|
break;
|
|
}
|
|
await delay(250);
|
|
}
|
|
|
|
if (heartbeatResult.status === "ran") {
|
|
await finish("ok", undefined, text);
|
|
} else if (heartbeatResult.status === "skipped") {
|
|
await finish("skipped", heartbeatResult.reason, text);
|
|
} else {
|
|
await finish("error", heartbeatResult.reason, text);
|
|
}
|
|
} else {
|
|
// wakeMode is "next-heartbeat" or runHeartbeatOnce not available
|
|
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
|
|
await finish("ok", undefined, text);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (job.payload.kind !== "agentTurn") {
|
|
await finish("skipped", "isolated job requires payload.kind=agentTurn");
|
|
return;
|
|
}
|
|
|
|
const res = await state.deps.runIsolatedAgentJob({
|
|
job,
|
|
message: job.payload.message,
|
|
});
|
|
if (res.status === "ok") {
|
|
await finish("ok", undefined, res.summary, res.outputText);
|
|
} else if (res.status === "skipped") {
|
|
await finish("skipped", undefined, res.summary, res.outputText);
|
|
} else {
|
|
await finish("error", res.error ?? "cron job failed", res.summary, res.outputText);
|
|
}
|
|
} catch (err) {
|
|
await finish("error", String(err));
|
|
} finally {
|
|
job.updatedAtMs = nowMs;
|
|
if (!opts.forced && job.enabled && !deleted) {
|
|
// Keep nextRunAtMs in sync in case the schedule advanced during a long run.
|
|
job.state.nextRunAtMs = computeJobNextRunAtMs(job, state.deps.nowMs());
|
|
}
|
|
}
|
|
}
|
|
|
|
export function wake(
|
|
state: CronServiceState,
|
|
opts: { mode: "now" | "next-heartbeat"; text: string },
|
|
) {
|
|
const text = opts.text.trim();
|
|
if (!text) {
|
|
return { ok: false } as const;
|
|
}
|
|
state.deps.enqueueSystemEvent(text);
|
|
if (opts.mode === "now") {
|
|
state.deps.requestHeartbeatNow({ reason: "wake" });
|
|
}
|
|
return { ok: true } as const;
|
|
}
|
|
|
|
export function stopTimer(state: CronServiceState) {
|
|
if (state.timer) {
|
|
clearTimeout(state.timer);
|
|
}
|
|
state.timer = null;
|
|
}
|
|
|
|
export function emit(state: CronServiceState, evt: CronEvent) {
|
|
try {
|
|
state.deps.onEvent?.(evt);
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|