From 244ed9db391aa964567ab38c868beed2b595e70a Mon Sep 17 00:00:00 2001 From: yinghaosang Date: Mon, 16 Feb 2026 20:40:24 +0800 Subject: [PATCH] fix(telegram): draft stream preview not threaded when replyToMode is on (#17880) (#17928) Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: cfd4181a237153b2df0535d5dcec32a866e60515 Co-authored-by: yinghaosang <261132136+yinghaosang@users.noreply.github.com> Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com> Reviewed-by: @obviyus --- CHANGELOG.md | 1 + src/telegram/bot-message-dispatch.test.ts | 72 ++++++++++++++++++- src/telegram/bot-message-dispatch.ts | 86 +++++++++++++++-------- src/telegram/draft-stream.test.ts | 78 ++++++++++++++++++++ src/telegram/draft-stream.ts | 7 +- 5 files changed, 211 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ec200ba6..a0db9f5c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ Docs: https://docs.openclaw.ai - Telegram: replace inbound `` placeholder with successful preflight voice transcript in message body context, preventing placeholder-only prompt bodies for mention-gated voice messages. (#16789) Thanks @Limitless2023. - Telegram: retry inbound media `getFile` calls (3 attempts with backoff) and gracefully fall back to placeholder-only processing when retries fail, preventing dropped voice/media messages on transient Telegram network errors. (#16154) Thanks @yinghaosang. - Telegram: finalize streaming preview replies in place instead of sending a second final message, preventing duplicate Telegram assistant outputs at stream completion. (#17218) Thanks @obviyus. +- Telegram: keep draft-stream preview replies attached to the user message for `replyToMode: "all"` in groups and DMs, preserving threaded reply context from preview through finalization. (#17880) Thanks @yinghaosang. - Telegram: disable block streaming when `channels.telegram.streamMode` is `off`, preventing newline/content-block replies from splitting into multiple messages. (#17679) Thanks @saivarunk. - Telegram: route non-abort slash commands on the normal chat/topic sequential lane while keeping true abort requests (`/stop`, `stop`) on the control lane, preventing command/reply race conditions from control-lane bypass. (#17899) Thanks @obviyus. - Discord: preserve channel session continuity when runtime payloads omit `message.channelId` by falling back to event/raw `channel_id` values for routing/session keys, so same-channel messages keep history across turns/restarts. Also align diagnostics so active Discord runs no longer appear as `sessionKey=unknown`. (#17622) Thanks @shakkernerd. diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index c5c98f3e0..2b306c23e 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -114,13 +114,14 @@ describe("dispatchTelegramMessage draft streaming", () => { context: TelegramMessageContext; telegramCfg?: Parameters[0]["telegramCfg"]; streamMode?: Parameters[0]["streamMode"]; + replyToMode?: Parameters[0]["replyToMode"]; }) { await dispatchTelegramMessage({ context: params.context, bot: createBot(), cfg: {}, runtime: createRuntime(), - replyToMode: "first", + replyToMode: params.replyToMode ?? "first", streamMode: params.streamMode ?? "partial", textLimit: 4096, telegramCfg: params.telegramCfg ?? {}, @@ -151,6 +152,7 @@ describe("dispatchTelegramMessage draft streaming", () => { expect.objectContaining({ chatId: 123, thread: { id: 777, scope: "dm" }, + replyToMessageId: 456, }), ); expect(draftStream.update).toHaveBeenCalledWith("Hello"); @@ -215,6 +217,52 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftStream.stop).toHaveBeenCalled(); }); + it("uses only the latest final payload when multiple finals are emitted", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Okay." }); + await dispatcherOptions.deliver({ text: "Ok" }, { kind: "final" }); + await dispatcherOptions.deliver({ text: "Okay." }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext() }); + + expect(editMessageTelegram).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "Okay.", expect.any(Object)); + expect(deliverReplies).not.toHaveBeenCalled(); + expect(draftStream.clear).not.toHaveBeenCalled(); + expect(draftStream.stop).toHaveBeenCalled(); + }); + + it("ignores transient shorter partial prefixes to avoid preview punctuation flicker", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Sure." }); + await replyOptions?.onPartialReply?.({ text: "Sure" }); + await replyOptions?.onPartialReply?.({ text: "Sure." }); + await dispatcherOptions.deliver({ text: "Sure." }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext() }); + + expect(draftStream.update).toHaveBeenCalledTimes(1); + expect(draftStream.update).toHaveBeenCalledWith("Sure."); + expect(editMessageTelegram).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "Sure.", expect.any(Object)); + }); + it("falls back to normal delivery when preview final is too long to edit", async () => { const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); @@ -259,4 +307,26 @@ describe("dispatchTelegramMessage draft streaming", () => { }), ); }); + + it("omits replyToMessageId from draft stream when replyToMode is off", async () => { + const draftStream = createDraftStream(); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ + context: createContext(), + replyToMode: "off", + }); + + expect(createTelegramDraftStream).toHaveBeenCalledWith( + expect.objectContaining({ + chatId: 123, + replyToMessageId: undefined, + }), + ); + }); }); diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 361a89718..bfa2d0143 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -91,12 +91,15 @@ export const dispatchTelegramMessage = async ({ ? telegramCfg.blockStreaming : cfg.agents?.defaults?.blockStreamingDefault === "on"; const canStreamDraft = streamMode !== "off" && !accountBlockStreamingEnabled; + const draftReplyToMessageId = + replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined; const draftStream = canStreamDraft ? createTelegramDraftStream({ api: bot.api, chatId, maxChars: draftMaxChars, thread: threadSpec, + replyToMessageId: draftReplyToMessageId, log: logVerbose, warn: logVerbose, }) @@ -117,6 +120,16 @@ export const dispatchTelegramMessage = async ({ return; } if (streamMode === "partial") { + // Some providers briefly emit a shorter prefix snapshot (for example + // "Sure." -> "Sure" -> "Sure."). Keep the longer preview to avoid + // visible punctuation flicker. + if ( + lastPartialText && + lastPartialText.startsWith(text) && + text.length < lastPartialText.length + ) { + return; + } lastPartialText = text; draftStream.update(text); return; @@ -281,42 +294,53 @@ export const dispatchTelegramMessage = async ({ await flushDraft(); const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; const previewMessageId = draftStream?.messageId(); - const previewButtons = ( - payload.channelData?.telegram as - | { buttons?: Array> } - | undefined - )?.buttons; - let draftStoppedForPreviewEdit = false; - if (!hasMedia && payload.text && typeof previewMessageId === "number") { - const canFinalizeViaPreviewEdit = payload.text.length <= draftMaxChars; - if (canFinalizeViaPreviewEdit) { - draftStream?.stop(); - draftStoppedForPreviewEdit = true; - try { - await editMessageTelegram(chatId, previewMessageId, payload.text, { - api: bot.api, - cfg, - accountId: route.accountId, - linkPreview: telegramCfg.linkPreview, - buttons: previewButtons, - }); - finalizedViaPreviewMessage = true; - deliveryState.delivered = true; - return; - } catch (err) { - logVerbose( - `telegram: preview final edit failed; falling back to standard send (${String(err)})`, - ); - } - } else { + const finalText = payload.text; + const canFinalizeViaPreviewEdit = + !hasMedia && + typeof finalText === "string" && + finalText.length > 0 && + typeof previewMessageId === "number" && + finalText.length <= draftMaxChars; + if (canFinalizeViaPreviewEdit) { + draftStream?.stop(); + const currentPreviewText = streamMode === "block" ? draftText : lastPartialText; + if ( + currentPreviewText && + currentPreviewText.startsWith(finalText) && + finalText.length < currentPreviewText.length + ) { + // Ignore regressive final edits (e.g., "Okay." -> "Ok"), which + // can appear transiently in some provider streams. + return; + } + const previewButtons = ( + payload.channelData?.telegram as + | { buttons?: Array> } + | undefined + )?.buttons; + try { + await editMessageTelegram(chatId, previewMessageId, finalText, { + api: bot.api, + cfg, + accountId: route.accountId, + linkPreview: telegramCfg.linkPreview, + buttons: previewButtons, + }); + finalizedViaPreviewMessage = true; + deliveryState.delivered = true; + return; + } catch (err) { logVerbose( - `telegram: preview final too long for edit (${payload.text.length} > ${draftMaxChars}); falling back to standard send`, + `telegram: preview final edit failed; falling back to standard send (${String(err)})`, ); } } - if (!draftStoppedForPreviewEdit) { - draftStream?.stop(); + if (payload.text && payload.text.length > draftMaxChars) { + logVerbose( + `telegram: preview final too long for edit (${payload.text.length} > ${draftMaxChars}); falling back to standard send`, + ); } + draftStream?.stop(); } const result = await deliverReplies({ ...deliveryBaseOptions, diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts index c8ad4ed7c..3cc635398 100644 --- a/src/telegram/draft-stream.test.ts +++ b/src/telegram/draft-stream.test.ts @@ -113,4 +113,82 @@ describe("createTelegramDraftStream", () => { expect(api.deleteMessage).toHaveBeenCalledWith(123, 17); }); + + it("includes reply_to_message_id in initial sendMessage when replyToMessageId is set", async () => { + const api = { + sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }), + editMessageText: vi.fn().mockResolvedValue(true), + deleteMessage: vi.fn().mockResolvedValue(true), + }; + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + replyToMessageId: 999, + }); + + stream.update("Hello"); + await vi.waitFor(() => + expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { reply_to_message_id: 999 }), + ); + }); + + it("includes both reply_to_message_id and message_thread_id when both are set", async () => { + const api = { + sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }), + editMessageText: vi.fn().mockResolvedValue(true), + deleteMessage: vi.fn().mockResolvedValue(true), + }; + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + thread: { id: 99, scope: "forum" }, + replyToMessageId: 555, + }); + + stream.update("Hello"); + await vi.waitFor(() => + expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { + message_thread_id: 99, + reply_to_message_id: 555, + }), + ); + }); + + it("passes undefined params when neither thread nor replyToMessageId is set", async () => { + const api = { + sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }), + editMessageText: vi.fn().mockResolvedValue(true), + deleteMessage: vi.fn().mockResolvedValue(true), + }; + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + }); + + stream.update("Hello"); + await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", undefined)); + }); + + it("includes reply_to_message_id even when thread resolves to general topic", async () => { + const api = { + sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }), + editMessageText: vi.fn().mockResolvedValue(true), + deleteMessage: vi.fn().mockResolvedValue(true), + }; + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + thread: { id: 1, scope: "forum" }, + replyToMessageId: 888, + }); + + stream.update("Hello"); + await vi.waitFor(() => + expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { reply_to_message_id: 888 }), + ); + }); }); diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index 6f9b4ada7..82ac80476 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -17,6 +17,7 @@ export function createTelegramDraftStream(params: { chatId: number; maxChars?: number; thread?: TelegramThreadSpec | null; + replyToMessageId?: number; throttleMs?: number; log?: (message: string) => void; warn?: (message: string) => void; @@ -28,6 +29,10 @@ export function createTelegramDraftStream(params: { const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS); const chatId = params.chatId; const threadParams = buildTelegramThreadParams(params.thread); + const replyParams = + params.replyToMessageId != null + ? { ...threadParams, reply_to_message_id: params.replyToMessageId } + : threadParams; let streamMessageId: number | undefined; let lastSentText = ""; @@ -64,7 +69,7 @@ export function createTelegramDraftStream(params: { await params.api.editMessageText(chatId, streamMessageId, trimmed); return; } - const sent = await params.api.sendMessage(chatId, trimmed, threadParams); + const sent = await params.api.sendMessage(chatId, trimmed, replyParams); const sentMessageId = sent?.message_id; if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) { stopped = true;