fix(cron): preserve telegram announce target + delivery truth

This commit is contained in:
Ayaan Zaidi
2026-02-23 10:22:35 +05:30
committed by Ayaan Zaidi
parent dcc52850c3
commit 03122e5933
12 changed files with 246 additions and 5 deletions

View File

@@ -635,6 +635,7 @@ async function sendSubagentAnnounceDirectly(params: {
triggerMessage: string;
completionMessage?: string;
expectsCompletionMessage: boolean;
bestEffortDeliver?: boolean;
completionRouteMode?: "bound" | "fallback" | "hook";
spawnMode?: SpawnSubagentMode;
directIdempotencyKey: string;
@@ -746,6 +747,7 @@ async function sendSubagentAnnounceDirectly(params: {
sessionKey: canonicalRequesterSessionKey,
message: params.triggerMessage,
deliver: !params.requesterIsSubagent,
bestEffortDeliver: params.bestEffortDeliver,
channel: params.requesterIsSubagent ? undefined : directOrigin?.channel,
accountId: params.requesterIsSubagent ? undefined : directOrigin?.accountId,
to: params.requesterIsSubagent ? undefined : directOrigin?.to,
@@ -781,6 +783,7 @@ async function deliverSubagentAnnouncement(params: {
targetRequesterSessionKey: string;
requesterIsSubagent: boolean;
expectsCompletionMessage: boolean;
bestEffortDeliver?: boolean;
completionRouteMode?: "bound" | "fallback" | "hook";
spawnMode?: SpawnSubagentMode;
directIdempotencyKey: string;
@@ -823,6 +826,7 @@ async function deliverSubagentAnnouncement(params: {
requesterIsSubagent: params.requesterIsSubagent,
expectsCompletionMessage: params.expectsCompletionMessage,
signal: params.signal,
bestEffortDeliver: params.bestEffortDeliver,
});
if (direct.delivered || !params.expectsCompletionMessage) {
return direct;
@@ -990,6 +994,7 @@ export async function runSubagentAnnounceFlow(params: {
expectsCompletionMessage?: boolean;
spawnMode?: SpawnSubagentMode;
signal?: AbortSignal;
bestEffortDeliver?: boolean;
}): Promise<boolean> {
let didAnnounce = false;
const expectsCompletionMessage = params.expectsCompletionMessage === true;
@@ -1247,6 +1252,7 @@ export async function runSubagentAnnounceFlow(params: {
targetRequesterSessionKey,
requesterIsSubagent,
expectsCompletionMessage: expectsCompletionMessage,
bestEffortDeliver: params.bestEffortDeliver,
completionRouteMode: completionResolution.routeMode,
spawnMode: params.spawnMode,
directIdempotencyKey,

View File

@@ -141,6 +141,7 @@ export async function agentViaGatewayCommand(opts: AgentCliOpts, runtime: Runtim
channel,
replyChannel: opts.replyChannel,
replyAccountId: opts.replyAccount,
bestEffortDeliver: opts.bestEffortDeliver,
timeout: timeoutSeconds,
lane: opts.lane,
extraSystemPrompt: opts.extraSystemPrompt,

View File

@@ -180,4 +180,65 @@ describe("runCronIsolatedAgentTurn", () => {
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
});
});
it("uses a unique announce childRunId for each cron run", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, {
lastProvider: "telegram",
lastChannel: "telegram",
lastTo: "123",
});
const deps: CliDeps = {
sendMessageSlack: vi.fn(),
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "final summary" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const cfg = makeCfg(home, storePath);
const job = {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: { mode: "announce", channel: "last" as const },
};
await runCronIsolatedAgentTurn({
cfg,
deps,
job,
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
await new Promise((resolve) => setTimeout(resolve, 5));
await runCronIsolatedAgentTurn({
cfg,
deps,
job,
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(2);
const firstArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as
| { childRunId?: string }
| undefined;
const secondArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[1]?.[0] as
| { childRunId?: string }
| undefined;
expect(firstArgs?.childRunId).toBeTruthy();
expect(secondArgs?.childRunId).toBeTruthy();
expect(secondArgs?.childRunId).not.toBe(firstArgs?.childRunId);
});
});
});

View File

@@ -81,11 +81,13 @@ async function expectExplicitTelegramTargetAnnounce(params: {
| {
requesterOrigin?: { channel?: string; to?: string };
roundOneReply?: string;
bestEffortDeliver?: boolean;
}
| undefined;
expect(announceArgs?.requesterOrigin?.channel).toBe("telegram");
expect(announceArgs?.requesterOrigin?.to).toBe("123");
expect(announceArgs?.roundOneReply).toBe(params.expectedText);
expect(announceArgs?.bestEffortDeliver).toBe(false);
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
});
}

View File

@@ -788,7 +788,7 @@ export async function runCronIsolatedAgentTurn(params: {
}
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: agentSessionKey,
childRunId: `${params.job.id}:${runSessionId}`,
childRunId: `${params.job.id}:${runSessionId}:${runStartedAt}`,
requesterSessionKey: announceSessionKey,
requesterOrigin: {
channel: resolvedDelivery.channel,
@@ -801,6 +801,9 @@ export async function runCronIsolatedAgentTurn(params: {
timeoutMs,
cleanup: params.job.deleteAfterRun ? "delete" : "keep",
roundOneReply: synthesizedText,
// Keep delivery outcome truthful for cron state: if outbound send fails,
// announce flow must report false so caller can apply best-effort policy.
bestEffortDeliver: false,
waitForCompletion: false,
startedAt: runStartedAt,
endedAt: runEndedAt,

View File

@@ -605,6 +605,53 @@ describe("Cron issue regressions", () => {
cron.stop();
});
it("keeps telegram delivery target writeback after manual cron.run", async () => {
const store = await makeStorePath();
const originalTarget = "https://t.me/obviyus";
const rewrittenTarget = "-10012345/6789";
const runIsolatedAgentJob = vi.fn(async (params: { job: { id: string } }) => {
const raw = await fs.readFile(store.storePath, "utf-8");
const persisted = JSON.parse(raw) as { version: number; jobs: CronJob[] };
const targetJob = persisted.jobs.find((job) => job.id === params.job.id);
if (targetJob?.delivery?.channel === "telegram") {
targetJob.delivery.to = rewrittenTarget;
}
await fs.writeFile(store.storePath, JSON.stringify(persisted, null, 2), "utf-8");
return { status: "ok" as const, summary: "done", delivered: true };
});
const cron = await startCronForStore({
storePath: store.storePath,
runIsolatedAgentJob,
});
const job = await cron.add({
name: "manual-writeback",
enabled: true,
schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "test" },
delivery: {
mode: "announce",
channel: "telegram",
to: originalTarget,
},
});
const result = await cron.run(job.id, "force");
expect(result).toEqual({ ok: true, ran: true });
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
jobs: CronJob[];
};
const persistedJob = persisted.jobs.find((entry) => entry.id === job.id);
expect(persistedJob?.delivery?.to).toBe(rewrittenTarget);
expect(persistedJob?.state.lastStatus).toBe("ok");
expect(persistedJob?.state.lastDelivered).toBe(true);
cron.stop();
});
it("#13845: one-shot jobs with terminal statuses do not re-fire on restart", async () => {
const store = await makeStorePath();
const pastAt = Date.parse("2026-02-06T09:00:00.000Z");

View File

@@ -44,6 +44,34 @@ export type CronListPageResult = {
hasMore: boolean;
nextOffset: number | null;
};
function mergeManualRunSnapshotAfterReload(params: {
state: CronServiceState;
jobId: string;
snapshot: {
enabled: boolean;
updatedAtMs: number;
state: CronJob["state"];
} | null;
removed: boolean;
}) {
if (!params.state.store) {
return;
}
if (params.removed) {
params.state.store.jobs = params.state.store.jobs.filter((job) => job.id !== params.jobId);
return;
}
if (!params.snapshot) {
return;
}
const reloaded = params.state.store.jobs.find((job) => job.id === params.jobId);
if (!reloaded) {
return;
}
reloaded.enabled = params.snapshot.enabled;
reloaded.updatedAtMs = params.snapshot.updatedAtMs;
reloaded.state = params.snapshot.state;
}
async function ensureLoadedForRead(state: CronServiceState) {
await ensureLoaded(state, { skipRecompute: true });
@@ -397,6 +425,23 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
// Manual runs should not advance other due jobs without executing them.
// Use maintenance-only recompute to repair missing values while
// preserving existing past-due nextRunAtMs entries for future timer ticks.
const postRunSnapshot = shouldDelete
? null
: {
enabled: job.enabled,
updatedAtMs: job.updatedAtMs,
state: structuredClone(job.state),
};
const postRunRemoved = shouldDelete;
// Isolated Telegram send can persist target writeback directly to disk.
// Reload before final persist so manual `cron run` keeps those changes.
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
mergeManualRunSnapshotAfterReload({
state,
jobId,
snapshot: postRunSnapshot,
removed: postRunRemoved,
});
recomputeNextRunsForMaintenance(state);
await persist(state);
armTimer(state);

View File

@@ -73,6 +73,7 @@ export const AgentParamsSchema = Type.Object(
groupChannel: Type.Optional(Type.String()),
groupSpace: Type.Optional(Type.String()),
timeout: Type.Optional(Type.Integer({ minimum: 0 })),
bestEffortDeliver: Type.Optional(Type.Boolean()),
lane: Type.Optional(Type.String()),
extraSystemPrompt: Type.Optional(Type.String()),
inputProvenance: Type.Optional(

View File

@@ -278,6 +278,26 @@ describe("gateway agent handler", () => {
vi.useRealTimers();
});
it("respects explicit bestEffortDeliver=false for main session runs", async () => {
primeMainAgentRun();
await invokeAgent(
{
message: "strict delivery",
agentId: "main",
sessionKey: "agent:main:main",
deliver: true,
bestEffortDeliver: false,
idempotencyKey: "test-strict-delivery",
},
{ reqId: "strict-1" },
);
await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled());
const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as Record<string, unknown>;
expect(callArgs.bestEffortDeliver).toBe(false);
});
it("handles missing cliSessionIds gracefully", async () => {
mockMainSessionEntry({});

View File

@@ -192,6 +192,7 @@ export const agentHandlers: GatewayRequestHandlers = {
extraSystemPrompt?: string;
idempotencyKey: string;
timeout?: number;
bestEffortDeliver?: boolean;
label?: string;
spawnedBy?: string;
inputProvenance?: InputProvenance;
@@ -216,6 +217,8 @@ export const agentHandlers: GatewayRequestHandlers = {
return;
}
const normalizedAttachments = normalizeRpcAttachmentsToChatAttachments(request.attachments);
const requestedBestEffortDeliver =
typeof request.bestEffortDeliver === "boolean" ? request.bestEffortDeliver : undefined;
let message = (request.message ?? "").trim();
let images: Array<{ type: "image"; data: string; mimeType: string }> = [];
@@ -310,7 +313,7 @@ export const agentHandlers: GatewayRequestHandlers = {
}
let resolvedSessionId = request.sessionId?.trim() || undefined;
let sessionEntry: SessionEntry | undefined;
let bestEffortDeliver = false;
let bestEffortDeliver = requestedBestEffortDeliver ?? false;
let cfgForAgent: ReturnType<typeof loadConfig> | undefined;
let resolvedSessionKey = requestedSessionKey;
let skipTimestampInjection = false;
@@ -448,7 +451,9 @@ export const agentHandlers: GatewayRequestHandlers = {
sessionKey: canonicalSessionKey,
clientRunId: idem,
});
bestEffortDeliver = true;
if (requestedBestEffortDeliver === undefined) {
bestEffortDeliver = true;
}
}
registerAgentRunContext(idem, { sessionKey: canonicalSessionKey });
}

View File

@@ -143,4 +143,46 @@ describe("maybePersistResolvedTelegramTarget", () => {
expect.any(Object),
);
});
it("matches username targets case-insensitively", async () => {
readConfigFileSnapshotForWrite.mockResolvedValue({
snapshot: {
config: {
channels: {
telegram: {
defaultTo: "https://t.me/mychannel",
},
},
},
},
writeOptions: {},
});
loadCronStore.mockResolvedValue({
version: 1,
jobs: [{ id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } }],
});
await maybePersistResolvedTelegramTarget({
cfg: {} as OpenClawConfig,
rawTarget: "@MyChannel",
resolvedChatId: "-100123",
});
expect(writeConfigFile).toHaveBeenCalledWith(
expect.objectContaining({
channels: {
telegram: {
defaultTo: "-100123",
},
},
}),
expect.any(Object),
);
expect(saveCronStore).toHaveBeenCalledWith(
"/tmp/cron/jobs.json",
expect.objectContaining({
jobs: [{ id: "a", delivery: { channel: "telegram", to: "-100123" } }],
}),
);
});
});

View File

@@ -17,9 +17,17 @@ function asObjectRecord(value: unknown): Record<string, unknown> | null {
return value as Record<string, unknown>;
}
function normalizeTelegramLookupTargetForMatch(raw: string): string | undefined {
const normalized = normalizeTelegramLookupTarget(raw);
if (!normalized) {
return undefined;
}
return normalized.startsWith("@") ? normalized.toLowerCase() : normalized;
}
function normalizeTelegramTargetForMatch(raw: string): string | undefined {
const parsed = parseTelegramTarget(raw);
const normalized = normalizeTelegramLookupTarget(parsed.chatId);
const normalized = normalizeTelegramLookupTargetForMatch(parsed.chatId);
if (!normalized) {
return undefined;
}
@@ -49,7 +57,7 @@ function resolveLegacyRewrite(params: {
if (normalizeTelegramChatId(parsed.chatId)) {
return null;
}
const normalized = normalizeTelegramLookupTarget(parsed.chatId);
const normalized = normalizeTelegramLookupTargetForMatch(parsed.chatId);
if (!normalized) {
return null;
}