diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index cd6545a2d..b794824eb 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -635,6 +635,7 @@ async function sendSubagentAnnounceDirectly(params: { triggerMessage: string; completionMessage?: string; expectsCompletionMessage: boolean; + bestEffortDeliver?: boolean; completionRouteMode?: "bound" | "fallback" | "hook"; spawnMode?: SpawnSubagentMode; directIdempotencyKey: string; @@ -746,6 +747,7 @@ async function sendSubagentAnnounceDirectly(params: { sessionKey: canonicalRequesterSessionKey, message: params.triggerMessage, deliver: !params.requesterIsSubagent, + bestEffortDeliver: params.bestEffortDeliver, channel: params.requesterIsSubagent ? undefined : directOrigin?.channel, accountId: params.requesterIsSubagent ? undefined : directOrigin?.accountId, to: params.requesterIsSubagent ? undefined : directOrigin?.to, @@ -781,6 +783,7 @@ async function deliverSubagentAnnouncement(params: { targetRequesterSessionKey: string; requesterIsSubagent: boolean; expectsCompletionMessage: boolean; + bestEffortDeliver?: boolean; completionRouteMode?: "bound" | "fallback" | "hook"; spawnMode?: SpawnSubagentMode; directIdempotencyKey: string; @@ -823,6 +826,7 @@ async function deliverSubagentAnnouncement(params: { requesterIsSubagent: params.requesterIsSubagent, expectsCompletionMessage: params.expectsCompletionMessage, signal: params.signal, + bestEffortDeliver: params.bestEffortDeliver, }); if (direct.delivered || !params.expectsCompletionMessage) { return direct; @@ -990,6 +994,7 @@ export async function runSubagentAnnounceFlow(params: { expectsCompletionMessage?: boolean; spawnMode?: SpawnSubagentMode; signal?: AbortSignal; + bestEffortDeliver?: boolean; }): Promise { let didAnnounce = false; const expectsCompletionMessage = params.expectsCompletionMessage === true; @@ -1247,6 +1252,7 @@ export async function runSubagentAnnounceFlow(params: { targetRequesterSessionKey, requesterIsSubagent, expectsCompletionMessage: expectsCompletionMessage, + bestEffortDeliver: params.bestEffortDeliver, completionRouteMode: completionResolution.routeMode, spawnMode: params.spawnMode, directIdempotencyKey, diff --git a/src/commands/agent-via-gateway.ts b/src/commands/agent-via-gateway.ts index 39e282614..a44caa3f3 100644 --- a/src/commands/agent-via-gateway.ts +++ b/src/commands/agent-via-gateway.ts @@ -141,6 +141,7 @@ export async function agentViaGatewayCommand(opts: AgentCliOpts, runtime: Runtim channel, replyChannel: opts.replyChannel, replyAccountId: opts.replyAccount, + bestEffortDeliver: opts.bestEffortDeliver, timeout: timeoutSeconds, lane: opts.lane, extraSystemPrompt: opts.extraSystemPrompt, diff --git a/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts b/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts index de5a0b352..b4d76cb34 100644 --- a/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts +++ b/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts @@ -180,4 +180,65 @@ describe("runCronIsolatedAgentTurn", () => { expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); }); }); + + it("uses a unique announce childRunId for each cron run", async () => { + await withTempCronHome(async (home) => { + const storePath = await writeSessionStore(home, { + lastProvider: "telegram", + lastChannel: "telegram", + lastTo: "123", + }); + const deps: CliDeps = { + sendMessageSlack: vi.fn(), + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; + + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "final summary" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + + const cfg = makeCfg(home, storePath); + const job = { + ...makeJob({ kind: "agentTurn", message: "do it" }), + delivery: { mode: "announce", channel: "last" as const }, + }; + + await runCronIsolatedAgentTurn({ + cfg, + deps, + job, + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); + await new Promise((resolve) => setTimeout(resolve, 5)); + await runCronIsolatedAgentTurn({ + cfg, + deps, + job, + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); + + expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(2); + const firstArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as + | { childRunId?: string } + | undefined; + const secondArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[1]?.[0] as + | { childRunId?: string } + | undefined; + expect(firstArgs?.childRunId).toBeTruthy(); + expect(secondArgs?.childRunId).toBeTruthy(); + expect(secondArgs?.childRunId).not.toBe(firstArgs?.childRunId); + }); + }); }); diff --git a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts index eaff473fd..8f0116021 100644 --- a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts +++ b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts @@ -81,11 +81,13 @@ async function expectExplicitTelegramTargetAnnounce(params: { | { requesterOrigin?: { channel?: string; to?: string }; roundOneReply?: string; + bestEffortDeliver?: boolean; } | undefined; expect(announceArgs?.requesterOrigin?.channel).toBe("telegram"); expect(announceArgs?.requesterOrigin?.to).toBe("123"); expect(announceArgs?.roundOneReply).toBe(params.expectedText); + expect(announceArgs?.bestEffortDeliver).toBe(false); expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); }); } diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 28e35f21e..bc46fcb18 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -788,7 +788,7 @@ export async function runCronIsolatedAgentTurn(params: { } const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: agentSessionKey, - childRunId: `${params.job.id}:${runSessionId}`, + childRunId: `${params.job.id}:${runSessionId}:${runStartedAt}`, requesterSessionKey: announceSessionKey, requesterOrigin: { channel: resolvedDelivery.channel, @@ -801,6 +801,9 @@ export async function runCronIsolatedAgentTurn(params: { timeoutMs, cleanup: params.job.deleteAfterRun ? "delete" : "keep", roundOneReply: synthesizedText, + // Keep delivery outcome truthful for cron state: if outbound send fails, + // announce flow must report false so caller can apply best-effort policy. + bestEffortDeliver: false, waitForCompletion: false, startedAt: runStartedAt, endedAt: runEndedAt, diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index ba7e181db..7515b1102 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -605,6 +605,53 @@ describe("Cron issue regressions", () => { cron.stop(); }); + it("keeps telegram delivery target writeback after manual cron.run", async () => { + const store = await makeStorePath(); + const originalTarget = "https://t.me/obviyus"; + const rewrittenTarget = "-10012345/6789"; + const runIsolatedAgentJob = vi.fn(async (params: { job: { id: string } }) => { + const raw = await fs.readFile(store.storePath, "utf-8"); + const persisted = JSON.parse(raw) as { version: number; jobs: CronJob[] }; + const targetJob = persisted.jobs.find((job) => job.id === params.job.id); + if (targetJob?.delivery?.channel === "telegram") { + targetJob.delivery.to = rewrittenTarget; + } + await fs.writeFile(store.storePath, JSON.stringify(persisted, null, 2), "utf-8"); + return { status: "ok" as const, summary: "done", delivered: true }; + }); + + const cron = await startCronForStore({ + storePath: store.storePath, + runIsolatedAgentJob, + }); + const job = await cron.add({ + name: "manual-writeback", + enabled: true, + schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "test" }, + delivery: { + mode: "announce", + channel: "telegram", + to: originalTarget, + }, + }); + + const result = await cron.run(job.id, "force"); + expect(result).toEqual({ ok: true, ran: true }); + + const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { + jobs: CronJob[]; + }; + const persistedJob = persisted.jobs.find((entry) => entry.id === job.id); + expect(persistedJob?.delivery?.to).toBe(rewrittenTarget); + expect(persistedJob?.state.lastStatus).toBe("ok"); + expect(persistedJob?.state.lastDelivered).toBe(true); + + cron.stop(); + }); + it("#13845: one-shot jobs with terminal statuses do not re-fire on restart", async () => { const store = await makeStorePath(); const pastAt = Date.parse("2026-02-06T09:00:00.000Z"); diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index ca2f8d1a9..bc2ec0934 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -44,6 +44,34 @@ export type CronListPageResult = { hasMore: boolean; nextOffset: number | null; }; +function mergeManualRunSnapshotAfterReload(params: { + state: CronServiceState; + jobId: string; + snapshot: { + enabled: boolean; + updatedAtMs: number; + state: CronJob["state"]; + } | null; + removed: boolean; +}) { + if (!params.state.store) { + return; + } + if (params.removed) { + params.state.store.jobs = params.state.store.jobs.filter((job) => job.id !== params.jobId); + return; + } + if (!params.snapshot) { + return; + } + const reloaded = params.state.store.jobs.find((job) => job.id === params.jobId); + if (!reloaded) { + return; + } + reloaded.enabled = params.snapshot.enabled; + reloaded.updatedAtMs = params.snapshot.updatedAtMs; + reloaded.state = params.snapshot.state; +} async function ensureLoadedForRead(state: CronServiceState) { await ensureLoaded(state, { skipRecompute: true }); @@ -397,6 +425,23 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f // Manual runs should not advance other due jobs without executing them. // Use maintenance-only recompute to repair missing values while // preserving existing past-due nextRunAtMs entries for future timer ticks. + const postRunSnapshot = shouldDelete + ? null + : { + enabled: job.enabled, + updatedAtMs: job.updatedAtMs, + state: structuredClone(job.state), + }; + const postRunRemoved = shouldDelete; + // Isolated Telegram send can persist target writeback directly to disk. + // Reload before final persist so manual `cron run` keeps those changes. + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); + mergeManualRunSnapshotAfterReload({ + state, + jobId, + snapshot: postRunSnapshot, + removed: postRunRemoved, + }); recomputeNextRunsForMaintenance(state); await persist(state); armTimer(state); diff --git a/src/gateway/protocol/schema/agent.ts b/src/gateway/protocol/schema/agent.ts index 41a13855b..b8c883f7f 100644 --- a/src/gateway/protocol/schema/agent.ts +++ b/src/gateway/protocol/schema/agent.ts @@ -73,6 +73,7 @@ export const AgentParamsSchema = Type.Object( groupChannel: Type.Optional(Type.String()), groupSpace: Type.Optional(Type.String()), timeout: Type.Optional(Type.Integer({ minimum: 0 })), + bestEffortDeliver: Type.Optional(Type.Boolean()), lane: Type.Optional(Type.String()), extraSystemPrompt: Type.Optional(Type.String()), inputProvenance: Type.Optional( diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index 543c902e8..fdb903119 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -278,6 +278,26 @@ describe("gateway agent handler", () => { vi.useRealTimers(); }); + it("respects explicit bestEffortDeliver=false for main session runs", async () => { + primeMainAgentRun(); + + await invokeAgent( + { + message: "strict delivery", + agentId: "main", + sessionKey: "agent:main:main", + deliver: true, + bestEffortDeliver: false, + idempotencyKey: "test-strict-delivery", + }, + { reqId: "strict-1" }, + ); + + await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled()); + const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as Record; + expect(callArgs.bestEffortDeliver).toBe(false); + }); + it("handles missing cliSessionIds gracefully", async () => { mockMainSessionEntry({}); diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 1f1e0df19..b24691d82 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -192,6 +192,7 @@ export const agentHandlers: GatewayRequestHandlers = { extraSystemPrompt?: string; idempotencyKey: string; timeout?: number; + bestEffortDeliver?: boolean; label?: string; spawnedBy?: string; inputProvenance?: InputProvenance; @@ -216,6 +217,8 @@ export const agentHandlers: GatewayRequestHandlers = { return; } const normalizedAttachments = normalizeRpcAttachmentsToChatAttachments(request.attachments); + const requestedBestEffortDeliver = + typeof request.bestEffortDeliver === "boolean" ? request.bestEffortDeliver : undefined; let message = (request.message ?? "").trim(); let images: Array<{ type: "image"; data: string; mimeType: string }> = []; @@ -310,7 +313,7 @@ export const agentHandlers: GatewayRequestHandlers = { } let resolvedSessionId = request.sessionId?.trim() || undefined; let sessionEntry: SessionEntry | undefined; - let bestEffortDeliver = false; + let bestEffortDeliver = requestedBestEffortDeliver ?? false; let cfgForAgent: ReturnType | undefined; let resolvedSessionKey = requestedSessionKey; let skipTimestampInjection = false; @@ -448,7 +451,9 @@ export const agentHandlers: GatewayRequestHandlers = { sessionKey: canonicalSessionKey, clientRunId: idem, }); - bestEffortDeliver = true; + if (requestedBestEffortDeliver === undefined) { + bestEffortDeliver = true; + } } registerAgentRunContext(idem, { sessionKey: canonicalSessionKey }); } diff --git a/src/telegram/target-writeback.test.ts b/src/telegram/target-writeback.test.ts index a9f1be73d..b32d5b33e 100644 --- a/src/telegram/target-writeback.test.ts +++ b/src/telegram/target-writeback.test.ts @@ -143,4 +143,46 @@ describe("maybePersistResolvedTelegramTarget", () => { expect.any(Object), ); }); + + it("matches username targets case-insensitively", async () => { + readConfigFileSnapshotForWrite.mockResolvedValue({ + snapshot: { + config: { + channels: { + telegram: { + defaultTo: "https://t.me/mychannel", + }, + }, + }, + }, + writeOptions: {}, + }); + loadCronStore.mockResolvedValue({ + version: 1, + jobs: [{ id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } }], + }); + + await maybePersistResolvedTelegramTarget({ + cfg: {} as OpenClawConfig, + rawTarget: "@MyChannel", + resolvedChatId: "-100123", + }); + + expect(writeConfigFile).toHaveBeenCalledWith( + expect.objectContaining({ + channels: { + telegram: { + defaultTo: "-100123", + }, + }, + }), + expect.any(Object), + ); + expect(saveCronStore).toHaveBeenCalledWith( + "/tmp/cron/jobs.json", + expect.objectContaining({ + jobs: [{ id: "a", delivery: { channel: "telegram", to: "-100123" } }], + }), + ); + }); }); diff --git a/src/telegram/target-writeback.ts b/src/telegram/target-writeback.ts index b4a7cd2bd..e8c4d52b2 100644 --- a/src/telegram/target-writeback.ts +++ b/src/telegram/target-writeback.ts @@ -17,9 +17,17 @@ function asObjectRecord(value: unknown): Record | null { return value as Record; } +function normalizeTelegramLookupTargetForMatch(raw: string): string | undefined { + const normalized = normalizeTelegramLookupTarget(raw); + if (!normalized) { + return undefined; + } + return normalized.startsWith("@") ? normalized.toLowerCase() : normalized; +} + function normalizeTelegramTargetForMatch(raw: string): string | undefined { const parsed = parseTelegramTarget(raw); - const normalized = normalizeTelegramLookupTarget(parsed.chatId); + const normalized = normalizeTelegramLookupTargetForMatch(parsed.chatId); if (!normalized) { return undefined; } @@ -49,7 +57,7 @@ function resolveLegacyRewrite(params: { if (normalizeTelegramChatId(parsed.chatId)) { return null; } - const normalized = normalizeTelegramLookupTarget(parsed.chatId); + const normalized = normalizeTelegramLookupTargetForMatch(parsed.chatId); if (!normalized) { return null; }