From 63b4c500d9aed15f7e4292eab3da3b50ea5d320d Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Sun, 22 Feb 2026 10:04:33 +0530 Subject: [PATCH] fix: prevent Telegram preview stream cross-edit race (#23202) Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 529abf209d56d9f991a7d308f4ecce78ac992e94 Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com> Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com> Reviewed-by: @obviyus --- CHANGELOG.md | 1 + src/telegram/bot-message-dispatch.test.ts | 187 +++++++++++++++++++++- src/telegram/bot-message-dispatch.ts | 122 ++++++++++---- src/telegram/draft-stream.test.ts | 74 ++++++--- src/telegram/draft-stream.ts | 22 ++- 5 files changed, 346 insertions(+), 60 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 72daec0c4..096018b7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Telegram/Streaming: preserve archived draft preview mapping after flush and clean superseded reasoning preview bubbles so multi-message preview finals no longer cross-edit or orphan stale messages under send/rotation races. (#23202) Thanks @obviyus. - Slack/Slash commands: preserve the Bolt app receiver when registering external select options handlers so monitor startup does not crash on runtimes that require bound `app.options` calls. (#23209) Thanks @0xgaia. - Agents/Ollama: preserve unsafe integer tool-call arguments as exact strings during NDJSON parsing, preventing large numeric IDs from being rounded before tool execution. (#23170) Thanks @BestJoester. - Cron/Gateway: keep `cron.list` and `cron.status` responsive during startup catch-up by avoiding a long-held cron lock while missed jobs execute. (#23106) Thanks @jayleekr. diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index ede7a1288..720b15d3b 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -137,7 +137,13 @@ describe("dispatchTelegramMessage draft streaming", () => { } function createBot(): Bot { - return { api: { sendMessage: vi.fn(), editMessageText: vi.fn() } } as unknown as Bot; + return { + api: { + sendMessage: vi.fn(), + editMessageText: vi.fn(), + deleteMessage: vi.fn().mockResolvedValue(true), + }, + } as unknown as Bot; } function createRuntime(): Parameters[0]["runtime"] { @@ -154,10 +160,12 @@ describe("dispatchTelegramMessage draft streaming", () => { context: TelegramMessageContext; telegramCfg?: Parameters[0]["telegramCfg"]; streamMode?: Parameters[0]["streamMode"]; + bot?: Bot; }) { + const bot = params.bot ?? createBot(); await dispatchTelegramMessage({ context: params.context, - bot: createBot(), + bot, cfg: {}, runtime: createRuntime(), replyToMode: "first", @@ -577,6 +585,141 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(deliverReplies).not.toHaveBeenCalled(); }); + it("maps finals correctly when first preview id resolves after message boundary", async () => { + let answerMessageId: number | undefined; + let answerDraftParams: + | { + onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void; + } + | undefined; + const answerDraftStream = { + update: vi.fn().mockImplementation((text: string) => { + if (text.includes("Message B")) { + answerMessageId = 1002; + } + }), + flush: vi.fn().mockResolvedValue(undefined), + messageId: vi.fn().mockImplementation(() => answerMessageId), + clear: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + forceNewMessage: vi.fn().mockImplementation(() => { + answerMessageId = undefined; + }), + }; + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce((params) => { + answerDraftParams = params as typeof answerDraftParams; + return answerDraftStream; + }) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onPartialReply?.({ text: "Message B partial" }); + // Simulate late resolution of message A preview ID after boundary rotation. + answerDraftParams?.onSupersededPreview?.({ + messageId: 1001, + textSnapshot: "Message A partial", + }); + + await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); + await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 1, + 123, + 1001, + "Message A final", + expect.any(Object), + ); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 2, + 123, + 1002, + "Message B final", + expect.any(Object), + ); + expect(deliverReplies).not.toHaveBeenCalled(); + }); + + it("maps finals correctly when archived preview id arrives during final flush", async () => { + let answerMessageId: number | undefined; + let answerDraftParams: + | { + onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void; + } + | undefined; + let emittedSupersededPreview = false; + const answerDraftStream = { + update: vi.fn().mockImplementation((text: string) => { + if (text.includes("Message B")) { + answerMessageId = 1002; + } + }), + flush: vi.fn().mockImplementation(async () => { + if (!emittedSupersededPreview) { + emittedSupersededPreview = true; + answerDraftParams?.onSupersededPreview?.({ + messageId: 1001, + textSnapshot: "Message A partial", + }); + } + }), + messageId: vi.fn().mockImplementation(() => answerMessageId), + clear: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + forceNewMessage: vi.fn().mockImplementation(() => { + answerMessageId = undefined; + }), + }; + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce((params) => { + answerDraftParams = params as typeof answerDraftParams; + return answerDraftStream; + }) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onPartialReply?.({ text: "Message B partial" }); + await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); + await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 1, + 123, + 1001, + "Message A final", + expect.any(Object), + ); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 2, + 123, + 1002, + "Message B final", + expect.any(Object), + ); + expect(deliverReplies).not.toHaveBeenCalled(); + }); + it.each(["block", "partial"] as const)( "splits reasoning lane only when a later reasoning block starts (%s mode)", async (streamMode) => { @@ -604,6 +747,46 @@ describe("dispatchTelegramMessage draft streaming", () => { }, ); + it("cleans superseded reasoning previews after lane rotation", async () => { + let reasoningDraftParams: + | { + onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void; + } + | undefined; + const answerDraftStream = createDraftStream(999); + const reasoningDraftStream = createDraftStream(111); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce((params) => { + reasoningDraftParams = params as typeof reasoningDraftParams; + return reasoningDraftStream; + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_first block_" }); + await replyOptions?.onReasoningEnd?.(); + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_second block_" }); + reasoningDraftParams?.onSupersededPreview?.({ + messageId: 4444, + textSnapshot: "Reasoning:\n_first block_", + }); + await dispatcherOptions.deliver({ text: "Done" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + const bot = createBot(); + await dispatchWithContext({ context: createContext(), streamMode: "partial", bot }); + + expect(reasoningDraftParams?.onSupersededPreview).toBeTypeOf("function"); + const deleteMessageCalls = ( + bot.api as unknown as { deleteMessage: { mock: { calls: unknown[][] } } } + ).deleteMessage.mock.calls; + expect(deleteMessageCalls).toContainEqual([123, 4444]); + }); + it.each(["block", "partial"] as const)( "does not split reasoning lane on reasoning end without a later reasoning block (%s mode)", async (streamMode) => { diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 71e535280..373bb66a5 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -155,7 +155,10 @@ export const dispatchTelegramMessage = async ({ lastPartialText: string; hasStreamedMessage: boolean; }; - const createDraftLane = (enabled: boolean): DraftLaneState => { + type ArchivedPreview = { messageId: number; textSnapshot: string }; + const archivedAnswerPreviews: ArchivedPreview[] = []; + const archivedReasoningPreviewIds: number[] = []; + const createDraftLane = (laneName: LaneName, enabled: boolean): DraftLaneState => { const stream = enabled ? createTelegramDraftStream({ api: bot.api, @@ -165,6 +168,21 @@ export const dispatchTelegramMessage = async ({ replyToMessageId: draftReplyToMessageId, minInitialChars: draftMinInitialChars, renderText: renderDraftPreview, + onSupersededPreview: + laneName === "answer" || laneName === "reasoning" + ? (preview) => { + if (laneName === "reasoning") { + if (!archivedReasoningPreviewIds.includes(preview.messageId)) { + archivedReasoningPreviewIds.push(preview.messageId); + } + return; + } + archivedAnswerPreviews.push({ + messageId: preview.messageId, + textSnapshot: preview.textSnapshot, + }); + } + : undefined, log: logVerbose, warn: logVerbose, }) @@ -176,15 +194,13 @@ export const dispatchTelegramMessage = async ({ }; }; const lanes: Record = { - answer: createDraftLane(canStreamAnswerDraft), - reasoning: createDraftLane(canStreamReasoningDraft), + answer: createDraftLane("answer", canStreamAnswerDraft), + reasoning: createDraftLane("reasoning", canStreamReasoningDraft), }; const answerLane = lanes.answer; const reasoningLane = lanes.reasoning; let splitReasoningOnNextStream = false; const reasoningStepState = createTelegramReasoningStepState(); - type ArchivedPreview = { messageId: number; textSnapshot: string }; - const archivedAnswerPreviews: ArchivedPreview[] = []; type SplitLaneSegment = { lane: LaneName; text: string }; const splitTextIntoLaneSegments = (text?: string): SplitLaneSegment[] => { const split = splitTelegramReasoningText(text); @@ -434,6 +450,43 @@ export const dispatchTelegramMessage = async ({ return result.delivered; }; type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped"; + const consumeArchivedAnswerPreviewForFinal = async (params: { + lane: DraftLaneState; + text: string; + payload: ReplyPayload; + previewButtons?: TelegramInlineButtons; + canEditViaPreview: boolean; + }): Promise => { + const archivedPreview = archivedAnswerPreviews.shift(); + if (!archivedPreview) { + return undefined; + } + if (params.canEditViaPreview) { + const finalized = await tryUpdatePreviewForLane({ + lane: params.lane, + laneName: "answer", + text: params.text, + previewButtons: params.previewButtons, + stopBeforeEdit: false, + skipRegressive: "existingOnly", + context: "final", + previewMessageId: archivedPreview.messageId, + previewTextSnapshot: archivedPreview.textSnapshot, + }); + if (finalized) { + return "preview-finalized"; + } + } + try { + await bot.api.deleteMessage(chatId, archivedPreview.messageId); + } catch (err) { + logVerbose( + `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, + ); + } + const delivered = await sendPayload(applyTextToPayload(params.payload, params.text)); + return delivered ? "sent" : "skipped"; + }; const deliverLaneText = async (params: { laneName: LaneName; text: string; @@ -456,38 +509,32 @@ export const dispatchTelegramMessage = async ({ !hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError; if (infoKind === "final") { - if (laneName === "answer" && archivedAnswerPreviews.length > 0) { - const archivedPreview = archivedAnswerPreviews.shift(); - if (archivedPreview) { - if (canEditViaPreview) { - const finalized = await tryUpdatePreviewForLane({ - lane, - laneName, - text, - previewButtons, - stopBeforeEdit: false, - skipRegressive: "existingOnly", - context: "final", - previewMessageId: archivedPreview.messageId, - previewTextSnapshot: archivedPreview.textSnapshot, - }); - if (finalized) { - return "preview-finalized"; - } - } - try { - await bot.api.deleteMessage(chatId, archivedPreview.messageId); - } catch (err) { - logVerbose( - `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, - ); - } - const delivered = await sendPayload(applyTextToPayload(payload, text)); - return delivered ? "sent" : "skipped"; + if (laneName === "answer") { + const archivedResult = await consumeArchivedAnswerPreviewForFinal({ + lane, + text, + payload, + previewButtons, + canEditViaPreview, + }); + if (archivedResult) { + return archivedResult; } } if (canEditViaPreview && !finalizedPreviewByLane[laneName]) { await flushDraftLane(lane); + if (laneName === "answer") { + const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({ + lane, + text, + payload, + previewButtons, + canEditViaPreview, + }); + if (archivedResultAfterFlush) { + return archivedResultAfterFlush; + } + } const finalized = await tryUpdatePreviewForLane({ lane, laneName, @@ -735,6 +782,15 @@ export const dispatchTelegramMessage = async ({ ); } } + for (const messageId of archivedReasoningPreviewIds) { + try { + await bot.api.deleteMessage(chatId, messageId); + } catch (err) { + logVerbose( + `telegram: archived reasoning preview cleanup failed (${messageId}): ${String(err)}`, + ); + } + } } let sentFallback = false; if ( diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts index fda42e9e9..0031fed4d 100644 --- a/src/telegram/draft-stream.test.ts +++ b/src/telegram/draft-stream.test.ts @@ -1,3 +1,4 @@ +import type { Bot } from "grammy"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { createTelegramDraftStream } from "./draft-stream.js"; @@ -18,8 +19,7 @@ function createThreadedDraftStream( thread: { id: number; scope: "forum" | "dm" }, ) { return createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, + api: api as unknown as Bot["api"], chatId: 123, thread, }); @@ -109,8 +109,7 @@ describe("createTelegramDraftStream", () => { deleteMessage: vi.fn().mockResolvedValue(true), }; const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, + api: api as unknown as Bot["api"], chatId: 123, }); @@ -146,8 +145,7 @@ describe("createTelegramDraftStream", () => { deleteMessage: vi.fn().mockResolvedValue(true), }; const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, + api: api as unknown as Bot["api"], chatId: 123, throttleMs: 1000, }); @@ -167,11 +165,47 @@ describe("createTelegramDraftStream", () => { } }); + it("does not rebind to an old message when forceNewMessage races an in-flight send", async () => { + let resolveFirstSend: ((value: { message_id: number }) => void) | undefined; + const firstSend = new Promise<{ message_id: number }>((resolve) => { + resolveFirstSend = resolve; + }); + const api = { + sendMessage: vi.fn().mockReturnValueOnce(firstSend).mockResolvedValueOnce({ message_id: 42 }), + editMessageText: vi.fn().mockResolvedValue(true), + deleteMessage: vi.fn().mockResolvedValue(true), + }; + const onSupersededPreview = vi.fn(); + const stream = createTelegramDraftStream({ + api: api as unknown as Bot["api"], + chatId: 123, + onSupersededPreview, + }); + + stream.update("Message A partial"); + await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledTimes(1)); + + // Rotate to message B before message A send resolves. + stream.forceNewMessage(); + stream.update("Message B partial"); + + resolveFirstSend?.({ message_id: 17 }); + await stream.flush(); + + expect(onSupersededPreview).toHaveBeenCalledWith({ + messageId: 17, + textSnapshot: "Message A partial", + parseMode: undefined, + }); + expect(api.sendMessage).toHaveBeenCalledTimes(2); + expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "Message B partial", undefined); + expect(api.editMessageText).not.toHaveBeenCalledWith(123, 17, "Message B partial"); + }); + it("supports rendered previews with parse_mode", async () => { const api = createMockDraftApi(); const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, + api: api as unknown as Bot["api"], chatId: 123, renderText: (text) => ({ text: `${text}`, parseMode: "HTML" }), }); @@ -191,8 +225,7 @@ describe("createTelegramDraftStream", () => { const api = createMockDraftApi(); const warn = vi.fn(); const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, + api: api as unknown as Bot["api"], chatId: 123, maxChars: 100, renderText: () => ({ text: `${"<".repeat(120)}`, parseMode: "HTML" }), @@ -229,8 +262,7 @@ describe("draft stream initial message debounce", () => { it("sends immediately on stop() even with 1 character", async () => { const api = createMockApi(); const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, + api: api as unknown as Bot["api"], chatId: 123, minInitialChars: 30, }); @@ -245,8 +277,7 @@ describe("draft stream initial message debounce", () => { it("sends immediately on stop() with short sentence", async () => { const api = createMockApi(); const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, + api: api as unknown as Bot["api"], chatId: 123, minInitialChars: 30, }); @@ -263,8 +294,7 @@ describe("draft stream initial message debounce", () => { it("does not send first message below threshold", async () => { const api = createMockApi(); const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, + api: api as unknown as Bot["api"], chatId: 123, minInitialChars: 30, }); @@ -278,8 +308,7 @@ describe("draft stream initial message debounce", () => { it("sends first message when reaching threshold", async () => { const api = createMockApi(); const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, + api: api as unknown as Bot["api"], chatId: 123, minInitialChars: 30, }); @@ -294,8 +323,7 @@ describe("draft stream initial message debounce", () => { it("works with longer text above threshold", async () => { const api = createMockApi(); const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, + api: api as unknown as Bot["api"], chatId: 123, minInitialChars: 30, }); @@ -311,8 +339,7 @@ describe("draft stream initial message debounce", () => { it("edits normally after first message is sent", async () => { const api = createMockApi(); const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, + api: api as unknown as Bot["api"], chatId: 123, minInitialChars: 30, }); @@ -335,8 +362,7 @@ describe("draft stream initial message debounce", () => { it("sends immediately without minInitialChars set (backward compatible)", async () => { const api = createMockApi(); const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, + api: api as unknown as Bot["api"], chatId: 123, // no minInitialChars (backward-compatible behavior) }); diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index e4fb2ca41..bcab90563 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -20,6 +20,12 @@ type TelegramDraftPreview = { parseMode?: "HTML"; }; +type SupersededTelegramPreview = { + messageId: number; + textSnapshot: string; + parseMode?: "HTML"; +}; + export function createTelegramDraftStream(params: { api: Bot["api"]; chatId: number; @@ -31,6 +37,8 @@ export function createTelegramDraftStream(params: { minInitialChars?: number; /** Optional preview renderer (e.g. markdown -> HTML + parse mode). */ renderText?: (text: string) => TelegramDraftPreview; + /** Called when a late send resolves after forceNewMessage() switched generations. */ + onSupersededPreview?: (preview: SupersededTelegramPreview) => void; log?: (message: string) => void; warn?: (message: string) => void; }): TelegramDraftStream { @@ -52,6 +60,7 @@ export function createTelegramDraftStream(params: { let lastSentParseMode: "HTML" | undefined; let stopped = false; let isFinal = false; + let generation = 0; const sendOrEditStreamMessage = async (text: string): Promise => { // Allow final flush even if stopped (e.g., after clear()). @@ -80,6 +89,7 @@ export function createTelegramDraftStream(params: { if (renderedText === lastSentText && renderedParseMode === lastSentParseMode) { return true; } + const sendGeneration = generation; // Debounce first preview send for better push notification quality. if (typeof streamMessageId !== "number" && minInitialChars != null && !isFinal) { @@ -114,7 +124,16 @@ export function createTelegramDraftStream(params: { params.warn?.("telegram stream preview stopped (missing message id from sendMessage)"); return false; } - streamMessageId = Math.trunc(sentMessageId); + const normalizedMessageId = Math.trunc(sentMessageId); + if (sendGeneration !== generation) { + params.onSupersededPreview?.({ + messageId: normalizedMessageId, + textSnapshot: renderedText, + parseMode: renderedParseMode, + }); + return true; + } + streamMessageId = normalizedMessageId; return true; } catch (err) { stopped = true; @@ -163,6 +182,7 @@ export function createTelegramDraftStream(params: { }; const forceNewMessage = () => { + generation += 1; streamMessageId = undefined; lastSentText = ""; lastSentParseMode = undefined;