diff --git a/CHANGELOG.md b/CHANGELOG.md index 82a13e6bd..fffe895a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ Docs: https://docs.openclaw.ai - Cron: accept epoch timestamps and 0ms durations in CLI `--at` parsing. - Cron: reload store data when the store file is recreated or mtime changes. - Cron: deliver announce runs directly, honor delivery mode, and respect wakeMode for summaries. (#8540) Thanks @tyler6204. +- Cron: correct announce delivery inference for thread session keys and null delivery inputs. (#9733) Thanks @tyler6204. - Telegram: include forward_from_chat metadata in forwarded messages and harden cron delivery target checks. (#8392) Thanks @Glucksberg. - Telegram: preserve DM topic threadId in deliveryContext. (#9039) Thanks @lailoo. - macOS: fix cron payload summary rendering and ISO 8601 formatter concurrency safety. diff --git a/src/agents/tools/cron-tool.test.ts b/src/agents/tools/cron-tool.test.ts index 7e842af94..77ffb36e6 100644 --- a/src/agents/tools/cron-tool.test.ts +++ b/src/agents/tools/cron-tool.test.ts @@ -233,4 +233,97 @@ describe("cron tool", () => { expect(call.method).toBe("cron.add"); expect(call.params?.agentId).toBeNull(); }); + + it("infers delivery from threaded session keys", async () => { + callGatewayMock.mockResolvedValueOnce({ ok: true }); + + const tool = createCronTool({ + agentSessionKey: "agent:main:slack:channel:general:thread:1699999999.0001", + }); + await tool.execute("call-thread", { + action: "add", + job: { + name: "reminder", + schedule: { at: new Date(123).toISOString() }, + payload: { kind: "agentTurn", message: "hello" }, + }, + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: { delivery?: { mode?: string; channel?: string; to?: string } }; + }; + expect(call?.params?.delivery).toEqual({ + mode: "announce", + channel: "slack", + to: "general", + }); + }); + + it("preserves telegram forum topics when inferring delivery", async () => { + callGatewayMock.mockResolvedValueOnce({ ok: true }); + + const tool = createCronTool({ + agentSessionKey: "agent:main:telegram:group:-1001234567890:topic:99", + }); + await tool.execute("call-telegram-topic", { + action: "add", + job: { + name: "reminder", + schedule: { at: new Date(123).toISOString() }, + payload: { kind: "agentTurn", message: "hello" }, + }, + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: { delivery?: { mode?: string; channel?: string; to?: string } }; + }; + expect(call?.params?.delivery).toEqual({ + mode: "announce", + channel: "telegram", + to: "-1001234567890:topic:99", + }); + }); + + it("infers delivery when delivery is null", async () => { + callGatewayMock.mockResolvedValueOnce({ ok: true }); + + const tool = createCronTool({ agentSessionKey: "agent:main:dm:alice" }); + await tool.execute("call-null-delivery", { + action: "add", + job: { + name: "reminder", + schedule: { at: new Date(123).toISOString() }, + payload: { kind: "agentTurn", message: "hello" }, + delivery: null, + }, + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: { delivery?: { mode?: string; channel?: string; to?: string } }; + }; + expect(call?.params?.delivery).toEqual({ + mode: "announce", + to: "alice", + }); + }); + + it("does not infer delivery when mode is none", async () => { + callGatewayMock.mockResolvedValueOnce({ ok: true }); + + const tool = createCronTool({ agentSessionKey: "agent:main:discord:dm:buddy" }); + await tool.execute("call-none", { + action: "add", + job: { + name: "reminder", + schedule: { at: new Date(123).toISOString() }, + payload: { kind: "agentTurn", message: "hello" }, + delivery: { mode: "none" }, + }, + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: { delivery?: { mode?: string; channel?: string; to?: string } }; + }; + expect(call?.params?.delivery).toEqual({ mode: "none" }); + }); }); diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index f4bf7b236..4c9633144 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -1,6 +1,8 @@ import { Type } from "@sinclair/typebox"; +import type { CronDelivery, CronMessageChannel } from "../../cron/types.js"; import { loadConfig } from "../../config/config.js"; import { normalizeCronJobCreate, normalizeCronJobPatch } from "../../cron/normalize.js"; +import { parseAgentSessionKey } from "../../sessions/session-key-utils.js"; import { truncateUtf16Safe } from "../../utils.js"; import { resolveSessionAgentId } from "../agent-scope.js"; import { optionalStringEnum, stringEnum } from "../schema/typebox.js"; @@ -153,6 +155,72 @@ async function buildReminderContextLines(params: { } } +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function stripThreadSuffixFromSessionKey(sessionKey: string): string { + const normalized = sessionKey.toLowerCase(); + const idx = normalized.lastIndexOf(":thread:"); + if (idx <= 0) { + return sessionKey; + } + const parent = sessionKey.slice(0, idx).trim(); + return parent ? parent : sessionKey; +} + +function inferDeliveryFromSessionKey(agentSessionKey?: string): CronDelivery | null { + const rawSessionKey = agentSessionKey?.trim(); + if (!rawSessionKey) { + return null; + } + const parsed = parseAgentSessionKey(stripThreadSuffixFromSessionKey(rawSessionKey)); + if (!parsed || !parsed.rest) { + return null; + } + const parts = parsed.rest.split(":").filter(Boolean); + if (parts.length === 0) { + return null; + } + const head = parts[0]?.trim().toLowerCase(); + if (!head || head === "main" || head === "subagent" || head === "acp") { + return null; + } + + // buildAgentPeerSessionKey encodes peers as: + // - dm: + // - :dm: + // - ::dm: + // - :group: + // - :channel: + // Threaded sessions append :thread:, which we strip so delivery targets the parent peer. + // NOTE: Telegram forum topics encode as :topic: and should be preserved. + const markerIndex = parts.findIndex( + (part) => part === "dm" || part === "group" || part === "channel", + ); + if (markerIndex === -1) { + return null; + } + const peerId = parts + .slice(markerIndex + 1) + .join(":") + .trim(); + if (!peerId) { + return null; + } + + let channel: CronMessageChannel | undefined; + if (markerIndex >= 1) { + channel = parts[0]?.trim().toLowerCase() as CronMessageChannel; + } + + const delivery: CronDelivery = { mode: "announce", to: peerId }; + if (channel) { + delivery.channel = channel; + } + return delivery; +} + export function createCronTool(opts?: CronToolOptions): AnyAgentTool { return { label: "Cron", @@ -243,6 +311,35 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con (job as { agentId?: string }).agentId = agentId; } } + + // [Fix Issue 3] Infer delivery target from session key for isolated jobs if not provided + if ( + opts?.agentSessionKey && + job && + typeof job === "object" && + "payload" in job && + (job as { payload?: { kind?: string } }).payload?.kind === "agentTurn" + ) { + const deliveryValue = (job as { delivery?: unknown }).delivery; + const delivery = isRecord(deliveryValue) ? deliveryValue : undefined; + const modeRaw = typeof delivery?.mode === "string" ? delivery.mode : ""; + const mode = modeRaw.trim().toLowerCase(); + const hasTarget = + (typeof delivery?.channel === "string" && delivery.channel.trim()) || + (typeof delivery?.to === "string" && delivery.to.trim()); + const shouldInfer = + (deliveryValue == null || delivery) && mode !== "none" && !hasTarget; + if (shouldInfer) { + const inferred = inferDeliveryFromSessionKey(opts.agentSessionKey); + if (inferred) { + (job as { delivery?: unknown }).delivery = { + ...delivery, + ...inferred, + } satisfies CronDelivery; + } + } + } + const contextMessages = typeof params.contextMessages === "number" && Number.isFinite(params.contextMessages) ? params.contextMessages diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index a4b33bf3c..41ee103b9 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -27,7 +27,6 @@ export function armTimer(state: CronServiceState) { state.deps.log.error({ err: String(err) }, "cron: timer tick failed"); }); }, clampedDelay); - state.timer.unref?.(); } export async function onTimer(state: CronServiceState) {