fix(cron): fix timeout, add timestamp validation, enable file sync
Fixes #7667 Task 1: Fix cron operation timeouts - Increase default gateway tool timeout from 10s to 30s - Increase cron-specific tool timeout to 60s - Increase CLI default timeout from 10s to 30s - Prevents timeouts when gateway is busy with long-running jobs Task 2: Add timestamp validation - New validateScheduleTimestamp() function in validate-timestamp.ts - Rejects atMs timestamps more than 1 minute in the past - Rejects atMs timestamps more than 10 years in the future - Applied to both cron.add and cron.update operations - Provides helpful error messages with current time and offset Task 3: Enable file sync for manual edits - Track file modification time (storeFileMtimeMs) in CronServiceState - Check file mtime in ensureLoaded() and reload if changed - Recompute next runs after reload to maintain accuracy - Update mtime after persist() to prevent reload loop - Dashboard now picks up manual edits to ~/.openclaw/cron/jobs.json
This commit is contained in:
committed by
Peter Steinberger
parent
a749db9820
commit
3a03e38378
@@ -208,7 +208,7 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con
|
|||||||
const gatewayOpts: GatewayCallOptions = {
|
const gatewayOpts: GatewayCallOptions = {
|
||||||
gatewayUrl: readStringParam(params, "gatewayUrl", { trim: false }),
|
gatewayUrl: readStringParam(params, "gatewayUrl", { trim: false }),
|
||||||
gatewayToken: readStringParam(params, "gatewayToken", { trim: false }),
|
gatewayToken: readStringParam(params, "gatewayToken", { trim: false }),
|
||||||
timeoutMs: typeof params.timeoutMs === "number" ? params.timeoutMs : undefined,
|
timeoutMs: typeof params.timeoutMs === "number" ? params.timeoutMs : 60_000,
|
||||||
};
|
};
|
||||||
|
|
||||||
switch (action) {
|
switch (action) {
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ export function resolveGatewayOptions(opts?: GatewayCallOptions) {
|
|||||||
const timeoutMs =
|
const timeoutMs =
|
||||||
typeof opts?.timeoutMs === "number" && Number.isFinite(opts.timeoutMs)
|
typeof opts?.timeoutMs === "number" && Number.isFinite(opts.timeoutMs)
|
||||||
? Math.max(1, Math.floor(opts.timeoutMs))
|
? Math.max(1, Math.floor(opts.timeoutMs))
|
||||||
: 10_000;
|
: 30_000;
|
||||||
return { url, token, timeoutMs };
|
return { url, token, timeoutMs };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ export function addGatewayClientOptions(cmd: Command) {
|
|||||||
return cmd
|
return cmd
|
||||||
.option("--url <url>", "Gateway WebSocket URL (defaults to gateway.remote.url when configured)")
|
.option("--url <url>", "Gateway WebSocket URL (defaults to gateway.remote.url when configured)")
|
||||||
.option("--token <token>", "Gateway token (if required)")
|
.option("--token <token>", "Gateway token (if required)")
|
||||||
.option("--timeout <ms>", "Timeout in ms", "10000")
|
.option("--timeout <ms>", "Timeout in ms", "30000")
|
||||||
.option("--expect-final", "Wait for final response (agent)", false);
|
.option("--expect-final", "Wait for final response (agent)", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -48,6 +48,8 @@ export type CronServiceState = {
|
|||||||
running: boolean;
|
running: boolean;
|
||||||
op: Promise<unknown>;
|
op: Promise<unknown>;
|
||||||
warnedDisabled: boolean;
|
warnedDisabled: boolean;
|
||||||
|
storeLoadedAtMs: number | null;
|
||||||
|
storeFileMtimeMs: number | null;
|
||||||
};
|
};
|
||||||
|
|
||||||
export function createCronServiceState(deps: CronServiceDeps): CronServiceState {
|
export function createCronServiceState(deps: CronServiceDeps): CronServiceState {
|
||||||
@@ -58,6 +60,8 @@ export function createCronServiceState(deps: CronServiceDeps): CronServiceState
|
|||||||
running: false,
|
running: false,
|
||||||
op: Promise.resolve(),
|
op: Promise.resolve(),
|
||||||
warnedDisabled: false,
|
warnedDisabled: false,
|
||||||
|
storeLoadedAtMs: null,
|
||||||
|
storeFileMtimeMs: null,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,20 +1,37 @@
|
|||||||
|
import fs from "node:fs";
|
||||||
import type { CronJob } from "../types.js";
|
import type { CronJob } from "../types.js";
|
||||||
import type { CronServiceState } from "./state.js";
|
import type { CronServiceState } from "./state.js";
|
||||||
import { migrateLegacyCronPayload } from "../payload-migration.js";
|
import { migrateLegacyCronPayload } from "../payload-migration.js";
|
||||||
import { loadCronStore, saveCronStore } from "../store.js";
|
import { loadCronStore, saveCronStore } from "../store.js";
|
||||||
|
import { recomputeNextRuns } from "./jobs.js";
|
||||||
import { inferLegacyName, normalizeOptionalText } from "./normalize.js";
|
import { inferLegacyName, normalizeOptionalText } from "./normalize.js";
|
||||||
|
|
||||||
const storeCache = new Map<string, { version: 1; jobs: CronJob[] }>();
|
async function getFileMtimeMs(path: string): Promise<number | null> {
|
||||||
|
try {
|
||||||
|
const stats = await fs.promises.stat(path);
|
||||||
|
return stats.mtimeMs;
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function ensureLoaded(state: CronServiceState) {
|
export async function ensureLoaded(state: CronServiceState) {
|
||||||
if (state.store) {
|
const fileMtimeMs = await getFileMtimeMs(state.deps.storePath);
|
||||||
return;
|
|
||||||
}
|
// Check if we need to reload:
|
||||||
const cached = storeCache.get(state.deps.storePath);
|
// - No store loaded yet
|
||||||
if (cached) {
|
// - File modification time has changed
|
||||||
state.store = cached;
|
// - File was modified after we last loaded (external edit)
|
||||||
|
const needsReload =
|
||||||
|
!state.store ||
|
||||||
|
(fileMtimeMs !== null &&
|
||||||
|
state.storeFileMtimeMs !== null &&
|
||||||
|
fileMtimeMs > state.storeFileMtimeMs);
|
||||||
|
|
||||||
|
if (!needsReload) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const loaded = await loadCronStore(state.deps.storePath);
|
const loaded = await loadCronStore(state.deps.storePath);
|
||||||
const jobs = (loaded.jobs ?? []) as unknown as Array<Record<string, unknown>>;
|
const jobs = (loaded.jobs ?? []) as unknown as Array<Record<string, unknown>>;
|
||||||
let mutated = false;
|
let mutated = false;
|
||||||
@@ -44,7 +61,12 @@ export async function ensureLoaded(state: CronServiceState) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };
|
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };
|
||||||
storeCache.set(state.deps.storePath, state.store);
|
state.storeLoadedAtMs = state.deps.nowMs();
|
||||||
|
state.storeFileMtimeMs = fileMtimeMs;
|
||||||
|
|
||||||
|
// Recompute next runs after loading to ensure accuracy
|
||||||
|
recomputeNextRuns(state);
|
||||||
|
|
||||||
if (mutated) {
|
if (mutated) {
|
||||||
await persist(state);
|
await persist(state);
|
||||||
}
|
}
|
||||||
@@ -69,4 +91,6 @@ export async function persist(state: CronServiceState) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
await saveCronStore(state.deps.storePath, state.store);
|
await saveCronStore(state.deps.storePath, state.store);
|
||||||
|
// Update file mtime after save to prevent immediate reload
|
||||||
|
state.storeFileMtimeMs = await getFileMtimeMs(state.deps.storePath);
|
||||||
}
|
}
|
||||||
|
|||||||
64
src/cron/validate-timestamp.ts
Normal file
64
src/cron/validate-timestamp.ts
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
import type { CronSchedule } from "./types.js";
|
||||||
|
|
||||||
|
const ONE_MINUTE_MS = 60 * 1000;
|
||||||
|
const TEN_YEARS_MS = 10 * 365.25 * 24 * 60 * 60 * 1000;
|
||||||
|
|
||||||
|
export type TimestampValidationError = {
|
||||||
|
ok: false;
|
||||||
|
message: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type TimestampValidationSuccess = {
|
||||||
|
ok: true;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type TimestampValidationResult = TimestampValidationSuccess | TimestampValidationError;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates atMs timestamps in cron schedules.
|
||||||
|
* Rejects timestamps that are:
|
||||||
|
* - More than 1 minute in the past
|
||||||
|
* - More than 10 years in the future
|
||||||
|
*/
|
||||||
|
export function validateScheduleTimestamp(
|
||||||
|
schedule: CronSchedule,
|
||||||
|
nowMs: number = Date.now(),
|
||||||
|
): TimestampValidationResult {
|
||||||
|
if (schedule.kind !== "at") {
|
||||||
|
return { ok: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
const atMs = schedule.atMs;
|
||||||
|
|
||||||
|
if (typeof atMs !== "number" || !Number.isFinite(atMs)) {
|
||||||
|
return {
|
||||||
|
ok: false,
|
||||||
|
message: `Invalid atMs: must be a finite number (got ${String(atMs)})`,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const diffMs = atMs - nowMs;
|
||||||
|
|
||||||
|
// Check if timestamp is in the past (allow 1 minute grace period)
|
||||||
|
if (diffMs < -ONE_MINUTE_MS) {
|
||||||
|
const nowDate = new Date(nowMs).toISOString();
|
||||||
|
const atDate = new Date(atMs).toISOString();
|
||||||
|
const minutesAgo = Math.floor(-diffMs / ONE_MINUTE_MS);
|
||||||
|
return {
|
||||||
|
ok: false,
|
||||||
|
message: `atMs is in the past: ${atDate} (${minutesAgo} minutes ago). Current time: ${nowDate}`,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if timestamp is too far in the future
|
||||||
|
if (diffMs > TEN_YEARS_MS) {
|
||||||
|
const atDate = new Date(atMs).toISOString();
|
||||||
|
const yearsAhead = Math.floor(diffMs / (365.25 * 24 * 60 * 60 * 1000));
|
||||||
|
return {
|
||||||
|
ok: false,
|
||||||
|
message: `atMs is too far in the future: ${atDate} (${yearsAhead} years ahead). Maximum allowed: 10 years`,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return { ok: true };
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@ import type { CronJobCreate, CronJobPatch } from "../../cron/types.js";
|
|||||||
import type { GatewayRequestHandlers } from "./types.js";
|
import type { GatewayRequestHandlers } from "./types.js";
|
||||||
import { normalizeCronJobCreate, normalizeCronJobPatch } from "../../cron/normalize.js";
|
import { normalizeCronJobCreate, normalizeCronJobPatch } from "../../cron/normalize.js";
|
||||||
import { readCronRunLogEntries, resolveCronRunLogPath } from "../../cron/run-log.js";
|
import { readCronRunLogEntries, resolveCronRunLogPath } from "../../cron/run-log.js";
|
||||||
|
import { validateScheduleTimestamp } from "../../cron/validate-timestamp.js";
|
||||||
import {
|
import {
|
||||||
ErrorCodes,
|
ErrorCodes,
|
||||||
errorShape,
|
errorShape,
|
||||||
@@ -82,7 +83,17 @@ export const cronHandlers: GatewayRequestHandlers = {
|
|||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const job = await context.cron.add(normalized as unknown as CronJobCreate);
|
const jobCreate = normalized as unknown as CronJobCreate;
|
||||||
|
const timestampValidation = validateScheduleTimestamp(jobCreate.schedule);
|
||||||
|
if (!timestampValidation.ok) {
|
||||||
|
respond(
|
||||||
|
false,
|
||||||
|
undefined,
|
||||||
|
errorShape(ErrorCodes.INVALID_REQUEST, timestampValidation.message),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const job = await context.cron.add(jobCreate);
|
||||||
respond(true, job, undefined);
|
respond(true, job, undefined);
|
||||||
},
|
},
|
||||||
"cron.update": async ({ params, respond, context }) => {
|
"cron.update": async ({ params, respond, context }) => {
|
||||||
@@ -116,7 +127,19 @@ export const cronHandlers: GatewayRequestHandlers = {
|
|||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const job = await context.cron.update(jobId, p.patch as unknown as CronJobPatch);
|
const patch = p.patch as unknown as CronJobPatch;
|
||||||
|
if (patch.schedule) {
|
||||||
|
const timestampValidation = validateScheduleTimestamp(patch.schedule);
|
||||||
|
if (!timestampValidation.ok) {
|
||||||
|
respond(
|
||||||
|
false,
|
||||||
|
undefined,
|
||||||
|
errorShape(ErrorCodes.INVALID_REQUEST, timestampValidation.message),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const job = await context.cron.update(jobId, patch);
|
||||||
respond(true, job, undefined);
|
respond(true, job, undefined);
|
||||||
},
|
},
|
||||||
"cron.remove": async ({ params, respond, context }) => {
|
"cron.remove": async ({ params, respond, context }) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user