feat(cron): enhance delivery handling and testing for isolated jobs

- Introduced new properties for explicit message targeting and message tool disabling in the EmbeddedRunAttemptParams type.
- Updated cron job tests to validate best-effort delivery behavior and handling of delivery failures.
- Added logic to clear delivery settings when switching session targets in cron jobs.
- Improved the resolution of delivery failures and best-effort logic in the isolated agent's run function.

This update enhances the flexibility and reliability of delivery mechanisms in isolated cron jobs, ensuring better handling of message delivery scenarios.
This commit is contained in:
Tyler Yust
2026-02-03 17:51:41 -08:00
committed by Peter Steinberger
parent ef4949b936
commit 64df61f697
9 changed files with 184 additions and 6 deletions

View File

@@ -78,6 +78,10 @@ export type EmbeddedRunAttemptParams = {
onReasoningStream?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>;
onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>;
onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void;
/** Require explicit message tool targets (no implicit last-route sends). */
requireExplicitMessageTarget?: boolean;
/** If true, omit the message tool from the tool list. */
disableMessageTool?: boolean;
extraSystemPrompt?: string;
streamParams?: AgentStreamParams;
ownerNumbers?: string[];

View File

@@ -82,6 +82,8 @@ describe("cron tool", () => {
expect(call.method).toBe("cron.add");
expect(call.params).toEqual({
name: "wake-up",
enabled: true,
deleteAfterRun: true,
schedule: { kind: "at", at: new Date(123).toISOString() },
sessionTarget: "main",
wakeMode: "next-heartbeat",

View File

@@ -48,7 +48,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
);
const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to);
const channel = (deliveryChannel ?? payloadChannel ?? "last") as CronMessageChannel;
const channel = deliveryChannel ?? payloadChannel ?? "last";
const to = deliveryTo ?? payloadTo;
if (hasDelivery) {
const resolvedMode = mode ?? "none";

View File

@@ -224,4 +224,85 @@ describe("runCronIsolatedAgentTurn", () => {
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
});
});
it("fails when announce delivery fails and best-effort is disabled", async () => {
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home);
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "hello from cron" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false);
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
channels: { telegram: { botToken: "t-1" } },
}),
deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: { mode: "announce", channel: "telegram", to: "123" },
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("error");
expect(res.error).toBe("cron announce delivery failed");
});
});
it("ignores announce delivery failures when best-effort is enabled", async () => {
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home);
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "hello from cron" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false);
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
channels: { telegram: { botToken: "t-1" } },
}),
deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: {
mode: "announce",
channel: "telegram",
to: "123",
bestEffort: true,
},
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
});
});
});

View File

@@ -89,6 +89,28 @@ function matchesMessagingToolDeliveryTarget(
return target.to === delivery.to;
}
function resolveCronDeliveryBestEffort(job: CronJob): boolean {
if (typeof job.delivery?.bestEffort === "boolean") {
return job.delivery.bestEffort;
}
if (job.payload.kind === "agentTurn" && typeof job.payload.bestEffortDeliver === "boolean") {
return job.payload.bestEffortDeliver;
}
return false;
}
function resolveCronDeliveryFailure(
resolved: Awaited<ReturnType<typeof resolveDeliveryTarget>>,
): Error | undefined {
if (resolved.error) {
return resolved.error;
}
if (!resolved.to) {
return new Error("cron delivery target is missing");
}
return undefined;
}
export type RunCronAgentTurnResult = {
status: "ok" | "error" | "skipped";
summary?: string;
@@ -428,6 +450,7 @@ export async function runCronIsolatedAgentTurn(params: {
const firstText = payloads[0]?.text ?? "";
const summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText);
const outputText = pickLastNonEmptyTextFromPayloads(payloads);
const deliveryBestEffort = resolveCronDeliveryBestEffort(params.job);
// Skip delivery for heartbeat-only responses (HEARTBEAT_OK with no real content).
const ackMaxChars = resolveHeartbeatAckMaxChars(agentCfg);
@@ -444,6 +467,19 @@ export async function runCronIsolatedAgentTurn(params: {
);
if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) {
const deliveryFailure = resolveCronDeliveryFailure(resolvedDelivery);
if (deliveryFailure) {
if (!deliveryBestEffort) {
return {
status: "error",
error: deliveryFailure.message,
summary,
outputText,
};
}
logWarn(`[cron:${params.job.id}] ${deliveryFailure.message}`);
return { status: "ok", summary, outputText };
}
const requesterSessionKey = resolveAgentMainSessionKey({
cfg: cfgWithAgentDefaults,
agentId,
@@ -459,7 +495,7 @@ export async function runCronIsolatedAgentTurn(params: {
: undefined;
const outcome: SubagentRunOutcome = { status: "ok" };
const taskLabel = params.job.name?.trim() || "cron job";
await runSubagentAnnounceFlow({
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: agentSessionKey,
childRunId: cronSession.sessionEntry.sessionId,
requesterSessionKey,
@@ -473,6 +509,14 @@ export async function runCronIsolatedAgentTurn(params: {
label: `Cron: ${taskLabel}`,
outcome,
});
if (!didAnnounce && !deliveryBestEffort) {
return {
status: "error",
error: "cron announce delivery failed",
summary,
outputText,
};
}
}
return { status: "ok", summary, outputText };

View File

@@ -19,8 +19,14 @@ describe("normalizeCronJobCreate", () => {
}) as unknown as Record<string, unknown>;
const payload = normalized.payload as Record<string, unknown>;
expect(payload.channel).toBe("telegram");
expect(payload.channel).toBeUndefined();
expect(payload.deliver).toBeUndefined();
expect("provider" in payload).toBe(false);
const delivery = normalized.delivery as Record<string, unknown>;
expect(delivery.mode).toBe("announce");
expect(delivery.channel).toBe("telegram");
expect(delivery.to).toBe("7200373102");
});
it("trims agentId and drops null", () => {
@@ -72,7 +78,13 @@ describe("normalizeCronJobCreate", () => {
}) as unknown as Record<string, unknown>;
const payload = normalized.payload as Record<string, unknown>;
expect(payload.channel).toBe("telegram");
expect(payload.channel).toBeUndefined();
expect(payload.deliver).toBeUndefined();
const delivery = normalized.delivery as Record<string, unknown>;
expect(delivery.mode).toBe("announce");
expect(delivery.channel).toBe("telegram");
expect(delivery.to).toBe("7200373102");
});
it("coerces ISO schedule.at to normalized ISO (UTC)", () => {

View File

@@ -0,0 +1,32 @@
import { describe, expect, it } from "vitest";
import type { CronJob, CronJobPatch } from "./types.js";
import { applyJobPatch } from "./service/jobs.js";
describe("applyJobPatch", () => {
it("clears delivery when switching to main session", () => {
const now = Date.now();
const job: CronJob = {
id: "job-1",
name: "job-1",
enabled: true,
createdAtMs: now,
updatedAtMs: now,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentTurn", message: "do it" },
delivery: { mode: "announce", channel: "telegram", to: "123" },
state: {},
};
const patch: CronJobPatch = {
sessionTarget: "main",
payload: { kind: "systemEvent", text: "ping" },
};
expect(() => applyJobPatch(job, patch)).not.toThrow();
expect(job.sessionTarget).toBe("main");
expect(job.payload.kind).toBe("systemEvent");
expect(job.delivery).toBeUndefined();
});
});

View File

@@ -2,8 +2,8 @@ import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { loadCronStore } from "../store.js";
import { CronService } from "./service.js";
import { loadCronStore } from "./store.js";
const noopLogger = {
debug: vi.fn(),

View File

@@ -158,6 +158,9 @@ export function applyJobPatch(job: CronJob, patch: CronJobPatch) {
if (patch.delivery) {
job.delivery = mergeCronDelivery(job.delivery, patch.delivery);
}
if (job.sessionTarget === "main" && job.delivery) {
job.delivery = undefined;
}
if (patch.state) {
job.state = { ...job.state, ...patch.state };
}
@@ -250,7 +253,7 @@ function mergeCronDelivery(
};
if (typeof patch.mode === "string") {
next.mode = patch.mode === "deliver" ? "announce" : patch.mode;
next.mode = (patch.mode as string) === "deliver" ? "announce" : patch.mode;
}
if ("channel" in patch) {
const channel = typeof patch.channel === "string" ? patch.channel.trim() : "";