diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index 471f4111c..8d8542b8c 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -78,6 +78,10 @@ export type EmbeddedRunAttemptParams = { onReasoningStream?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise; onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise; onAgentEvent?: (evt: { stream: string; data: Record }) => void; + /** Require explicit message tool targets (no implicit last-route sends). */ + requireExplicitMessageTarget?: boolean; + /** If true, omit the message tool from the tool list. */ + disableMessageTool?: boolean; extraSystemPrompt?: string; streamParams?: AgentStreamParams; ownerNumbers?: string[]; diff --git a/src/agents/tools/cron-tool.test.ts b/src/agents/tools/cron-tool.test.ts index d61a0505a..7e842af94 100644 --- a/src/agents/tools/cron-tool.test.ts +++ b/src/agents/tools/cron-tool.test.ts @@ -82,6 +82,8 @@ describe("cron tool", () => { expect(call.method).toBe("cron.add"); expect(call.params).toEqual({ name: "wake-up", + enabled: true, + deleteAfterRun: true, schedule: { kind: "at", at: new Date(123).toISOString() }, sessionTarget: "main", wakeMode: "next-heartbeat", diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index 6039749a0..c7cbe87f9 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -48,7 +48,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { ); const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to); - const channel = (deliveryChannel ?? payloadChannel ?? "last") as CronMessageChannel; + const channel = deliveryChannel ?? payloadChannel ?? "last"; const to = deliveryTo ?? payloadTo; if (hasDelivery) { const resolvedMode = mode ?? "none"; diff --git a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts index 256878b8e..adedfba71 100644 --- a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts +++ b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts @@ -224,4 +224,85 @@ describe("runCronIsolatedAgentTurn", () => { expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); }); }); + + it("fails when announce delivery fails and best-effort is disabled", async () => { + await withTempHome(async (home) => { + const storePath = await writeSessionStore(home); + const deps: CliDeps = { + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "hello from cron" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false); + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath, { + channels: { telegram: { botToken: "t-1" } }, + }), + deps, + job: { + ...makeJob({ kind: "agentTurn", message: "do it" }), + delivery: { mode: "announce", channel: "telegram", to: "123" }, + }, + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); + + expect(res.status).toBe("error"); + expect(res.error).toBe("cron announce delivery failed"); + }); + }); + + it("ignores announce delivery failures when best-effort is enabled", async () => { + await withTempHome(async (home) => { + const storePath = await writeSessionStore(home); + const deps: CliDeps = { + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "hello from cron" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false); + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath, { + channels: { telegram: { botToken: "t-1" } }, + }), + deps, + job: { + ...makeJob({ kind: "agentTurn", message: "do it" }), + delivery: { + mode: "announce", + channel: "telegram", + to: "123", + bestEffort: true, + }, + }, + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); + + expect(res.status).toBe("ok"); + expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); + }); + }); }); diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index fa395b10e..b0eb580de 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -89,6 +89,28 @@ function matchesMessagingToolDeliveryTarget( return target.to === delivery.to; } +function resolveCronDeliveryBestEffort(job: CronJob): boolean { + if (typeof job.delivery?.bestEffort === "boolean") { + return job.delivery.bestEffort; + } + if (job.payload.kind === "agentTurn" && typeof job.payload.bestEffortDeliver === "boolean") { + return job.payload.bestEffortDeliver; + } + return false; +} + +function resolveCronDeliveryFailure( + resolved: Awaited>, +): Error | undefined { + if (resolved.error) { + return resolved.error; + } + if (!resolved.to) { + return new Error("cron delivery target is missing"); + } + return undefined; +} + export type RunCronAgentTurnResult = { status: "ok" | "error" | "skipped"; summary?: string; @@ -428,6 +450,7 @@ export async function runCronIsolatedAgentTurn(params: { const firstText = payloads[0]?.text ?? ""; const summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText); const outputText = pickLastNonEmptyTextFromPayloads(payloads); + const deliveryBestEffort = resolveCronDeliveryBestEffort(params.job); // Skip delivery for heartbeat-only responses (HEARTBEAT_OK with no real content). const ackMaxChars = resolveHeartbeatAckMaxChars(agentCfg); @@ -444,6 +467,19 @@ export async function runCronIsolatedAgentTurn(params: { ); if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) { + const deliveryFailure = resolveCronDeliveryFailure(resolvedDelivery); + if (deliveryFailure) { + if (!deliveryBestEffort) { + return { + status: "error", + error: deliveryFailure.message, + summary, + outputText, + }; + } + logWarn(`[cron:${params.job.id}] ${deliveryFailure.message}`); + return { status: "ok", summary, outputText }; + } const requesterSessionKey = resolveAgentMainSessionKey({ cfg: cfgWithAgentDefaults, agentId, @@ -459,7 +495,7 @@ export async function runCronIsolatedAgentTurn(params: { : undefined; const outcome: SubagentRunOutcome = { status: "ok" }; const taskLabel = params.job.name?.trim() || "cron job"; - await runSubagentAnnounceFlow({ + const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: agentSessionKey, childRunId: cronSession.sessionEntry.sessionId, requesterSessionKey, @@ -473,6 +509,14 @@ export async function runCronIsolatedAgentTurn(params: { label: `Cron: ${taskLabel}`, outcome, }); + if (!didAnnounce && !deliveryBestEffort) { + return { + status: "error", + error: "cron announce delivery failed", + summary, + outputText, + }; + } } return { status: "ok", summary, outputText }; diff --git a/src/cron/normalize.test.ts b/src/cron/normalize.test.ts index bec4dfa07..a876e0317 100644 --- a/src/cron/normalize.test.ts +++ b/src/cron/normalize.test.ts @@ -19,8 +19,14 @@ describe("normalizeCronJobCreate", () => { }) as unknown as Record; const payload = normalized.payload as Record; - expect(payload.channel).toBe("telegram"); + expect(payload.channel).toBeUndefined(); + expect(payload.deliver).toBeUndefined(); expect("provider" in payload).toBe(false); + + const delivery = normalized.delivery as Record; + expect(delivery.mode).toBe("announce"); + expect(delivery.channel).toBe("telegram"); + expect(delivery.to).toBe("7200373102"); }); it("trims agentId and drops null", () => { @@ -72,7 +78,13 @@ describe("normalizeCronJobCreate", () => { }) as unknown as Record; const payload = normalized.payload as Record; - expect(payload.channel).toBe("telegram"); + expect(payload.channel).toBeUndefined(); + expect(payload.deliver).toBeUndefined(); + + const delivery = normalized.delivery as Record; + expect(delivery.mode).toBe("announce"); + expect(delivery.channel).toBe("telegram"); + expect(delivery.to).toBe("7200373102"); }); it("coerces ISO schedule.at to normalized ISO (UTC)", () => { diff --git a/src/cron/service.jobs.test.ts b/src/cron/service.jobs.test.ts new file mode 100644 index 000000000..c2080fa06 --- /dev/null +++ b/src/cron/service.jobs.test.ts @@ -0,0 +1,32 @@ +import { describe, expect, it } from "vitest"; +import type { CronJob, CronJobPatch } from "./types.js"; +import { applyJobPatch } from "./service/jobs.js"; + +describe("applyJobPatch", () => { + it("clears delivery when switching to main session", () => { + const now = Date.now(); + const job: CronJob = { + id: "job-1", + name: "job-1", + enabled: true, + createdAtMs: now, + updatedAtMs: now, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { kind: "agentTurn", message: "do it" }, + delivery: { mode: "announce", channel: "telegram", to: "123" }, + state: {}, + }; + + const patch: CronJobPatch = { + sessionTarget: "main", + payload: { kind: "systemEvent", text: "ping" }, + }; + + expect(() => applyJobPatch(job, patch)).not.toThrow(); + expect(job.sessionTarget).toBe("main"); + expect(job.payload.kind).toBe("systemEvent"); + expect(job.delivery).toBeUndefined(); + }); +}); diff --git a/src/cron/service.store.migration.test.ts b/src/cron/service.store.migration.test.ts index a0384c9d3..6e0734b15 100644 --- a/src/cron/service.store.migration.test.ts +++ b/src/cron/service.store.migration.test.ts @@ -2,8 +2,8 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { loadCronStore } from "../store.js"; import { CronService } from "./service.js"; +import { loadCronStore } from "./store.js"; const noopLogger = { debug: vi.fn(), diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index bd39237f7..d814d44c6 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -158,6 +158,9 @@ export function applyJobPatch(job: CronJob, patch: CronJobPatch) { if (patch.delivery) { job.delivery = mergeCronDelivery(job.delivery, patch.delivery); } + if (job.sessionTarget === "main" && job.delivery) { + job.delivery = undefined; + } if (patch.state) { job.state = { ...job.state, ...patch.state }; } @@ -250,7 +253,7 @@ function mergeCronDelivery( }; if (typeof patch.mode === "string") { - next.mode = patch.mode === "deliver" ? "announce" : patch.mode; + next.mode = (patch.mode as string) === "deliver" ? "announce" : patch.mode; } if ("channel" in patch) { const channel = typeof patch.channel === "string" ? patch.channel.trim() : "";