From 821520a05718b58bb47c88ba329e3bc01d755514 Mon Sep 17 00:00:00 2001 From: Tyler Yust <64381258+tyler6204@users.noreply.github.com> Date: Thu, 5 Feb 2026 13:08:41 -0800 Subject: [PATCH] fix cron scheduling and reminder delivery regressions (#9733) * fix(cron): prevent timer from allowing process exit (fixes #9694) The cron timer was using .unref(), which caused the Node.js event loop to exit or sleep if no other handles were active. This prevented cron jobs from firing in some environments. * fix(cron): infer delivery target for isolated jobs (fixes #9683) When creating isolated agentTurn jobs (e.g. reminders) without explicit delivery options, the job would default to 'announce' but fail to resolve the target conversation. Now, we infer the channel and recipient from the agent's current session key. * fix(cron): enhance delivery inference for threaded sessions and null inputs (#9733) Improves the delivery inference logic in the cron tool to correctly handle threaded session keys and cases where delivery is explicitly set to null. This ensures that the appropriate delivery mode and target are inferred based on the agent's session key, enhancing the reliability of job execution. * fix: preserve telegram topic delivery inference (#9733) (thanks @tyler6204) * fix: simplify cron delivery merge spread (#9733) (thanks @tyler6204) --- CHANGELOG.md | 1 + src/agents/tools/cron-tool.test.ts | 93 ++++++++++++++++++++++++++++ src/agents/tools/cron-tool.ts | 97 ++++++++++++++++++++++++++++++ src/cron/service/timer.ts | 1 - 4 files changed, 191 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 82a13e6bd..fffe895a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ Docs: https://docs.openclaw.ai - Cron: accept epoch timestamps and 0ms durations in CLI `--at` parsing. - Cron: reload store data when the store file is recreated or mtime changes. - Cron: deliver announce runs directly, honor delivery mode, and respect wakeMode for summaries. (#8540) Thanks @tyler6204. +- Cron: correct announce delivery inference for thread session keys and null delivery inputs. (#9733) Thanks @tyler6204. - Telegram: include forward_from_chat metadata in forwarded messages and harden cron delivery target checks. (#8392) Thanks @Glucksberg. - Telegram: preserve DM topic threadId in deliveryContext. (#9039) Thanks @lailoo. - macOS: fix cron payload summary rendering and ISO 8601 formatter concurrency safety. diff --git a/src/agents/tools/cron-tool.test.ts b/src/agents/tools/cron-tool.test.ts index 7e842af94..77ffb36e6 100644 --- a/src/agents/tools/cron-tool.test.ts +++ b/src/agents/tools/cron-tool.test.ts @@ -233,4 +233,97 @@ describe("cron tool", () => { expect(call.method).toBe("cron.add"); expect(call.params?.agentId).toBeNull(); }); + + it("infers delivery from threaded session keys", async () => { + callGatewayMock.mockResolvedValueOnce({ ok: true }); + + const tool = createCronTool({ + agentSessionKey: "agent:main:slack:channel:general:thread:1699999999.0001", + }); + await tool.execute("call-thread", { + action: "add", + job: { + name: "reminder", + schedule: { at: new Date(123).toISOString() }, + payload: { kind: "agentTurn", message: "hello" }, + }, + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: { delivery?: { mode?: string; channel?: string; to?: string } }; + }; + expect(call?.params?.delivery).toEqual({ + mode: "announce", + channel: "slack", + to: "general", + }); + }); + + it("preserves telegram forum topics when inferring delivery", async () => { + callGatewayMock.mockResolvedValueOnce({ ok: true }); + + const tool = createCronTool({ + agentSessionKey: "agent:main:telegram:group:-1001234567890:topic:99", + }); + await tool.execute("call-telegram-topic", { + action: "add", + job: { + name: "reminder", + schedule: { at: new Date(123).toISOString() }, + payload: { kind: "agentTurn", message: "hello" }, + }, + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: { delivery?: { mode?: string; channel?: string; to?: string } }; + }; + expect(call?.params?.delivery).toEqual({ + mode: "announce", + channel: "telegram", + to: "-1001234567890:topic:99", + }); + }); + + it("infers delivery when delivery is null", async () => { + callGatewayMock.mockResolvedValueOnce({ ok: true }); + + const tool = createCronTool({ agentSessionKey: "agent:main:dm:alice" }); + await tool.execute("call-null-delivery", { + action: "add", + job: { + name: "reminder", + schedule: { at: new Date(123).toISOString() }, + payload: { kind: "agentTurn", message: "hello" }, + delivery: null, + }, + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: { delivery?: { mode?: string; channel?: string; to?: string } }; + }; + expect(call?.params?.delivery).toEqual({ + mode: "announce", + to: "alice", + }); + }); + + it("does not infer delivery when mode is none", async () => { + callGatewayMock.mockResolvedValueOnce({ ok: true }); + + const tool = createCronTool({ agentSessionKey: "agent:main:discord:dm:buddy" }); + await tool.execute("call-none", { + action: "add", + job: { + name: "reminder", + schedule: { at: new Date(123).toISOString() }, + payload: { kind: "agentTurn", message: "hello" }, + delivery: { mode: "none" }, + }, + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: { delivery?: { mode?: string; channel?: string; to?: string } }; + }; + expect(call?.params?.delivery).toEqual({ mode: "none" }); + }); }); diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index f4bf7b236..4c9633144 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -1,6 +1,8 @@ import { Type } from "@sinclair/typebox"; +import type { CronDelivery, CronMessageChannel } from "../../cron/types.js"; import { loadConfig } from "../../config/config.js"; import { normalizeCronJobCreate, normalizeCronJobPatch } from "../../cron/normalize.js"; +import { parseAgentSessionKey } from "../../sessions/session-key-utils.js"; import { truncateUtf16Safe } from "../../utils.js"; import { resolveSessionAgentId } from "../agent-scope.js"; import { optionalStringEnum, stringEnum } from "../schema/typebox.js"; @@ -153,6 +155,72 @@ async function buildReminderContextLines(params: { } } +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function stripThreadSuffixFromSessionKey(sessionKey: string): string { + const normalized = sessionKey.toLowerCase(); + const idx = normalized.lastIndexOf(":thread:"); + if (idx <= 0) { + return sessionKey; + } + const parent = sessionKey.slice(0, idx).trim(); + return parent ? parent : sessionKey; +} + +function inferDeliveryFromSessionKey(agentSessionKey?: string): CronDelivery | null { + const rawSessionKey = agentSessionKey?.trim(); + if (!rawSessionKey) { + return null; + } + const parsed = parseAgentSessionKey(stripThreadSuffixFromSessionKey(rawSessionKey)); + if (!parsed || !parsed.rest) { + return null; + } + const parts = parsed.rest.split(":").filter(Boolean); + if (parts.length === 0) { + return null; + } + const head = parts[0]?.trim().toLowerCase(); + if (!head || head === "main" || head === "subagent" || head === "acp") { + return null; + } + + // buildAgentPeerSessionKey encodes peers as: + // - dm: + // - :dm: + // - ::dm: + // - :group: + // - :channel: + // Threaded sessions append :thread:, which we strip so delivery targets the parent peer. + // NOTE: Telegram forum topics encode as :topic: and should be preserved. + const markerIndex = parts.findIndex( + (part) => part === "dm" || part === "group" || part === "channel", + ); + if (markerIndex === -1) { + return null; + } + const peerId = parts + .slice(markerIndex + 1) + .join(":") + .trim(); + if (!peerId) { + return null; + } + + let channel: CronMessageChannel | undefined; + if (markerIndex >= 1) { + channel = parts[0]?.trim().toLowerCase() as CronMessageChannel; + } + + const delivery: CronDelivery = { mode: "announce", to: peerId }; + if (channel) { + delivery.channel = channel; + } + return delivery; +} + export function createCronTool(opts?: CronToolOptions): AnyAgentTool { return { label: "Cron", @@ -243,6 +311,35 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con (job as { agentId?: string }).agentId = agentId; } } + + // [Fix Issue 3] Infer delivery target from session key for isolated jobs if not provided + if ( + opts?.agentSessionKey && + job && + typeof job === "object" && + "payload" in job && + (job as { payload?: { kind?: string } }).payload?.kind === "agentTurn" + ) { + const deliveryValue = (job as { delivery?: unknown }).delivery; + const delivery = isRecord(deliveryValue) ? deliveryValue : undefined; + const modeRaw = typeof delivery?.mode === "string" ? delivery.mode : ""; + const mode = modeRaw.trim().toLowerCase(); + const hasTarget = + (typeof delivery?.channel === "string" && delivery.channel.trim()) || + (typeof delivery?.to === "string" && delivery.to.trim()); + const shouldInfer = + (deliveryValue == null || delivery) && mode !== "none" && !hasTarget; + if (shouldInfer) { + const inferred = inferDeliveryFromSessionKey(opts.agentSessionKey); + if (inferred) { + (job as { delivery?: unknown }).delivery = { + ...delivery, + ...inferred, + } satisfies CronDelivery; + } + } + } + const contextMessages = typeof params.contextMessages === "number" && Number.isFinite(params.contextMessages) ? params.contextMessages diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index a4b33bf3c..41ee103b9 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -27,7 +27,6 @@ export function armTimer(state: CronServiceState) { state.deps.log.error({ err: String(err) }, "cron: timer tick failed"); }); }, clampedDelay); - state.timer.unref?.(); } export async function onTimer(state: CronServiceState) {