From 6edb512efaebcc7992baa9a30c1d1d0c44e32ea7 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Mon, 2 Mar 2026 21:56:59 +0530 Subject: [PATCH] feat(telegram): use sendMessageDraft for private chat streaming (#31824) * feat(telegram): use sendMessageDraft for private stream previews * test(telegram): cover DM draft id rotation race * fix(telegram): keep DM reasoning updates in draft preview * fix(telegram): split DM reasoning preview transport * fix(telegram): harden DM draft preview fallback paths * style(telegram): normalize draft preview formatting --- src/telegram/bot-message-dispatch.test.ts | 137 ++++++++++++- src/telegram/bot-message-dispatch.ts | 3 + .../bot.create-telegram-bot.test-harness.ts | 5 + src/telegram/draft-stream.test.ts | 120 ++++++++++- src/telegram/draft-stream.ts | 186 ++++++++++++++---- src/telegram/lane-delivery.ts | 19 ++ 6 files changed, 431 insertions(+), 39 deletions(-) diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index 42f2317d2..5104c7c05 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -53,10 +53,15 @@ describe("dispatchTelegramMessage draft streaming", () => { }); function createDraftStream(messageId?: number) { + let previewRevision = 0; return { - update: vi.fn(), - flush: vi.fn().mockResolvedValue(undefined), + update: vi.fn().mockImplementation(() => { + previewRevision += 1; + }), + flush: vi.fn().mockResolvedValue(true), messageId: vi.fn().mockReturnValue(messageId), + previewMode: vi.fn().mockReturnValue("message"), + previewRevision: vi.fn().mockImplementation(() => previewRevision), clear: vi.fn().mockResolvedValue(undefined), stop: vi.fn().mockResolvedValue(undefined), forceNewMessage: vi.fn(), @@ -66,14 +71,18 @@ describe("dispatchTelegramMessage draft streaming", () => { function createSequencedDraftStream(startMessageId = 1001) { let activeMessageId: number | undefined; let nextMessageId = startMessageId; + let previewRevision = 0; return { update: vi.fn().mockImplementation(() => { if (activeMessageId == null) { activeMessageId = nextMessageId++; } + previewRevision += 1; }), - flush: vi.fn().mockResolvedValue(undefined), + flush: vi.fn().mockResolvedValue(true), messageId: vi.fn().mockImplementation(() => activeMessageId), + previewMode: vi.fn().mockReturnValue("message"), + previewRevision: vi.fn().mockImplementation(() => previewRevision), clear: vi.fn().mockResolvedValue(undefined), stop: vi.fn().mockResolvedValue(undefined), forceNewMessage: vi.fn().mockImplementation(() => { @@ -1084,6 +1093,36 @@ describe("dispatchTelegramMessage draft streaming", () => { }, ); + it("uses message preview transport for DM reasoning lane when answer preview lane is active", async () => { + setupDraftStreams({ answerMessageId: 999, reasoningMessageId: 111 }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_Working on it..._" }); + await replyOptions?.onPartialReply?.({ text: "Checking the directory..." }); + await dispatcherOptions.deliver({ text: "Checking the directory..." }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createReasoningStreamContext(), streamMode: "partial" }); + + expect(createTelegramDraftStream).toHaveBeenCalledTimes(2); + expect(createTelegramDraftStream.mock.calls[0]?.[0]).toEqual( + expect.objectContaining({ + thread: { id: 777, scope: "dm" }, + previewTransport: "auto", + }), + ); + expect(createTelegramDraftStream.mock.calls[1]?.[0]).toEqual( + expect.objectContaining({ + thread: { id: 777, scope: "dm" }, + previewTransport: "message", + }), + ); + }); + it("keeps reasoning and answer streaming in separate preview lanes", async () => { const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({ answerMessageId: 999, @@ -1218,6 +1257,98 @@ describe("dispatchTelegramMessage draft streaming", () => { ); }); + it("keeps DM draft reasoning block updates in preview flow without sending duplicates", async () => { + const answerDraftStream = createDraftStream(999); + let previewRevision = 0; + const reasoningDraftStream = { + update: vi.fn(), + flush: vi.fn().mockResolvedValue(true), + messageId: vi.fn().mockReturnValue(undefined), + previewMode: vi.fn().mockReturnValue("draft"), + previewRevision: vi.fn().mockImplementation(() => previewRevision), + clear: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + forceNewMessage: vi.fn(), + }; + reasoningDraftStream.update.mockImplementation(() => { + previewRevision += 1; + }); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ + text: "Reasoning:\nI am counting letters...", + }); + await replyOptions?.onReasoningEnd?.(); + await replyOptions?.onPartialReply?.({ text: "3" }); + await dispatcherOptions.deliver({ text: "3" }, { kind: "final" }); + await dispatcherOptions.deliver( + { + text: "Reasoning:\nI am counting letters. The total is 3.", + }, + { kind: "block" }, + ); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createReasoningStreamContext(), streamMode: "partial" }); + + expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "3", expect.any(Object)); + expect(reasoningDraftStream.update).toHaveBeenCalledWith( + "Reasoning:\nI am counting letters. The total is 3.", + ); + expect(reasoningDraftStream.flush).toHaveBeenCalled(); + expect(deliverReplies).not.toHaveBeenCalledWith( + expect.objectContaining({ + replies: [expect.objectContaining({ text: expect.stringContaining("Reasoning:\nI am") })], + }), + ); + }); + + it("falls back to normal send when DM draft reasoning flush emits no preview update", async () => { + const answerDraftStream = createDraftStream(999); + const previewRevision = 0; + const reasoningDraftStream = { + update: vi.fn(), + flush: vi.fn().mockResolvedValue(false), + messageId: vi.fn().mockReturnValue(undefined), + previewMode: vi.fn().mockReturnValue("draft"), + previewRevision: vi.fn().mockReturnValue(previewRevision), + clear: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + forceNewMessage: vi.fn(), + }; + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_step one_" }); + await replyOptions?.onReasoningEnd?.(); + await dispatcherOptions.deliver( + { text: "Reasoning:\n_step one expanded_" }, + { kind: "block" }, + ); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createReasoningStreamContext(), streamMode: "partial" }); + + expect(reasoningDraftStream.flush).toHaveBeenCalled(); + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [expect.objectContaining({ text: "Reasoning:\n_step one expanded_" })], + }), + ); + }); + it("routes think-tag partials to reasoning lane and keeps answer lane clean", async () => { const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({ answerMessageId: 999, diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 094f9b5ff..5a7d795c1 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -190,12 +190,15 @@ export const dispatchTelegramMessage = async ({ const archivedAnswerPreviews: ArchivedPreview[] = []; const archivedReasoningPreviewIds: number[] = []; const createDraftLane = (laneName: LaneName, enabled: boolean): DraftLaneState => { + const useMessagePreviewTransportForDmReasoning = + laneName === "reasoning" && threadSpec?.scope === "dm" && canStreamAnswerDraft; const stream = enabled ? createTelegramDraftStream({ api: bot.api, chatId, maxChars: draftMaxChars, thread: threadSpec, + previewTransport: useMessagePreviewTransportForDmReasoning ? "message" : "auto", replyToMessageId: draftReplyToMessageId, minInitialChars: draftMinInitialChars, renderText: renderDraftPreview, diff --git a/src/telegram/bot.create-telegram-bot.test-harness.ts b/src/telegram/bot.create-telegram-bot.test-harness.ts index 15e6bb10b..122ef973a 100644 --- a/src/telegram/bot.create-telegram-bot.test-harness.ts +++ b/src/telegram/bot.create-telegram-bot.test-harness.ts @@ -111,6 +111,7 @@ export const botCtorSpy: AnyMock = vi.fn(); export const answerCallbackQuerySpy: AnyAsyncMock = vi.fn(async () => undefined); export const sendChatActionSpy: AnyMock = vi.fn(); export const editMessageTextSpy: AnyAsyncMock = vi.fn(async () => ({ message_id: 88 })); +export const sendMessageDraftSpy: AnyAsyncMock = vi.fn(async () => true); export const setMessageReactionSpy: AnyAsyncMock = vi.fn(async () => undefined); export const setMyCommandsSpy: AnyAsyncMock = vi.fn(async () => undefined); export const getMeSpy: AnyAsyncMock = vi.fn(async () => ({ @@ -127,6 +128,7 @@ type ApiStub = { answerCallbackQuery: typeof answerCallbackQuerySpy; sendChatAction: typeof sendChatActionSpy; editMessageText: typeof editMessageTextSpy; + sendMessageDraft: typeof sendMessageDraftSpy; setMessageReaction: typeof setMessageReactionSpy; setMyCommands: typeof setMyCommandsSpy; getMe: typeof getMeSpy; @@ -141,6 +143,7 @@ const apiStub: ApiStub = { answerCallbackQuery: answerCallbackQuerySpy, sendChatAction: sendChatActionSpy, editMessageText: editMessageTextSpy, + sendMessageDraft: sendMessageDraftSpy, setMessageReaction: setMessageReactionSpy, setMyCommands: setMyCommandsSpy, getMe: getMeSpy, @@ -311,6 +314,8 @@ beforeEach(() => { }); editMessageTextSpy.mockReset(); editMessageTextSpy.mockResolvedValue({ message_id: 88 }); + sendMessageDraftSpy.mockReset(); + sendMessageDraftSpy.mockResolvedValue(true); enqueueSystemEventSpy.mockReset(); wasSentByBot.mockReset(); wasSentByBot.mockReturnValue(false); diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts index 0bdbf4dd0..aa5a53ed8 100644 --- a/src/telegram/draft-stream.test.ts +++ b/src/telegram/draft-stream.test.ts @@ -7,6 +7,7 @@ type TelegramDraftStreamParams = Parameters[0] function createMockDraftApi(sendMessageImpl?: () => Promise<{ message_id: number }>) { return { sendMessage: vi.fn(sendMessageImpl ?? (async () => ({ message_id: 17 }))), + sendMessageDraft: vi.fn().mockResolvedValue(true), editMessageText: vi.fn().mockResolvedValue(true), deleteMessage: vi.fn().mockResolvedValue(true), }; @@ -107,17 +108,130 @@ describe("createTelegramDraftStream", () => { await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", undefined)); }); - it("includes message_thread_id for dm threads and clears preview on cleanup", async () => { + it("uses sendMessageDraft for dm threads and does not create a preview message", async () => { const api = createMockDraftApi(); const stream = createThreadedDraftStream(api, { id: 42, scope: "dm" }); stream.update("Hello"); await vi.waitFor(() => - expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { message_thread_id: 42 }), + expect(api.sendMessageDraft).toHaveBeenCalledWith(123, expect.any(Number), "Hello", { + message_thread_id: 42, + }), ); + expect(api.sendMessage).not.toHaveBeenCalled(); + expect(api.editMessageText).not.toHaveBeenCalled(); await stream.clear(); - expect(api.deleteMessage).toHaveBeenCalledWith(123, 17); + expect(api.deleteMessage).not.toHaveBeenCalled(); + }); + + it("supports forcing message transport in dm threads", async () => { + const api = createMockDraftApi(); + const stream = createDraftStream(api, { + thread: { id: 42, scope: "dm" }, + previewTransport: "message", + }); + + stream.update("Hello"); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { message_thread_id: 42 }); + expect(api.sendMessageDraft).not.toHaveBeenCalled(); + expect(api.editMessageText).not.toHaveBeenCalled(); + }); + + it("falls back to message transport when sendMessageDraft is unavailable", async () => { + const api = createMockDraftApi(); + delete (api as { sendMessageDraft?: unknown }).sendMessageDraft; + const warn = vi.fn(); + const stream = createDraftStream(api, { + thread: { id: 42, scope: "dm" }, + previewTransport: "draft", + warn, + }); + + stream.update("Hello"); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { message_thread_id: 42 }); + expect(api.editMessageText).not.toHaveBeenCalled(); + expect(warn).toHaveBeenCalledWith( + "telegram stream preview: sendMessageDraft unavailable; falling back to sendMessage/editMessageText", + ); + }); + + it("retries DM message preview send without thread when thread is not found", async () => { + const api = createMockDraftApi(); + api.sendMessage + .mockRejectedValueOnce(new Error("400: Bad Request: message thread not found")) + .mockResolvedValueOnce({ message_id: 17 }); + const warn = vi.fn(); + const stream = createDraftStream(api, { + thread: { id: 42, scope: "dm" }, + previewTransport: "message", + warn, + }); + + stream.update("Hello"); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenNthCalledWith(1, 123, "Hello", { message_thread_id: 42 }); + expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "Hello", undefined); + expect(warn).toHaveBeenCalledWith( + "telegram stream preview send failed with message_thread_id, retrying without thread", + ); + }); + + it("does not edit or delete messages after DM draft stream finalization", async () => { + const api = createMockDraftApi(); + const stream = createThreadedDraftStream(api, { id: 42, scope: "dm" }); + + stream.update("Hello"); + await stream.flush(); + stream.update("Hello again"); + await stream.stop(); + await stream.clear(); + + expect(api.sendMessageDraft).toHaveBeenCalled(); + expect(api.sendMessage).not.toHaveBeenCalled(); + expect(api.editMessageText).not.toHaveBeenCalled(); + expect(api.deleteMessage).not.toHaveBeenCalled(); + }); + + it("rotates draft_id when forceNewMessage races an in-flight DM draft send", async () => { + let resolveFirstDraft: ((value: boolean) => void) | undefined; + const firstDraftSend = new Promise((resolve) => { + resolveFirstDraft = resolve; + }); + const api = { + sendMessageDraft: vi.fn().mockReturnValueOnce(firstDraftSend).mockResolvedValueOnce(true), + sendMessage: vi.fn().mockResolvedValue({ message_id: 17 }), + editMessageText: vi.fn().mockResolvedValue(true), + deleteMessage: vi.fn().mockResolvedValue(true), + }; + const stream = createThreadedDraftStream( + api as unknown as ReturnType, + { id: 42, scope: "dm" }, + ); + + stream.update("Message A"); + await vi.waitFor(() => expect(api.sendMessageDraft).toHaveBeenCalledTimes(1)); + + stream.forceNewMessage(); + stream.update("Message B"); + + resolveFirstDraft?.(true); + await stream.flush(); + + expect(api.sendMessageDraft).toHaveBeenCalledTimes(2); + const firstDraftId = api.sendMessageDraft.mock.calls[0]?.[1]; + const secondDraftId = api.sendMessageDraft.mock.calls[1]?.[1]; + expect(typeof firstDraftId).toBe("number"); + expect(typeof secondDraftId).toBe("number"); + expect(firstDraftId).not.toBe(secondDraftId); + expect(api.sendMessageDraft.mock.calls[1]?.[2]).toBe("Message B"); + expect(api.sendMessage).not.toHaveBeenCalled(); + expect(api.editMessageText).not.toHaveBeenCalled(); }); it("creates new message after forceNewMessage is called", async () => { diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index 87b45f2c8..97e076752 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -4,11 +4,41 @@ import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helper const TELEGRAM_STREAM_MAX_CHARS = 4096; const DEFAULT_THROTTLE_MS = 1000; +const TELEGRAM_DRAFT_ID_MAX = 2_147_483_647; +const THREAD_NOT_FOUND_RE = /400:\s*Bad Request:\s*message thread not found/i; + +type TelegramSendMessageDraft = ( + chatId: number, + draftId: number, + text: string, + params?: { + message_thread_id?: number; + parse_mode?: "HTML"; + }, +) => Promise; + +let nextDraftId = 0; + +function allocateTelegramDraftId(): number { + nextDraftId = nextDraftId >= TELEGRAM_DRAFT_ID_MAX ? 1 : nextDraftId + 1; + return nextDraftId; +} + +function resolveSendMessageDraftApi(api: Bot["api"]): TelegramSendMessageDraft | undefined { + const sendMessageDraft = (api as Bot["api"] & { sendMessageDraft?: TelegramSendMessageDraft }) + .sendMessageDraft; + if (typeof sendMessageDraft !== "function") { + return undefined; + } + return sendMessageDraft.bind(api as object); +} export type TelegramDraftStream = { update: (text: string) => void; flush: () => Promise; messageId: () => number | undefined; + previewMode?: () => "message" | "draft"; + previewRevision?: () => number; clear: () => Promise; stop: () => Promise; /** Reset internal state so the next update creates a new message instead of editing. */ @@ -31,6 +61,7 @@ export function createTelegramDraftStream(params: { chatId: number; maxChars?: number; thread?: TelegramThreadSpec | null; + previewTransport?: "auto" | "message" | "draft"; replyToMessageId?: number; throttleMs?: number; /** Minimum chars before sending first message (debounce for push notifications) */ @@ -49,17 +80,126 @@ export function createTelegramDraftStream(params: { const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS); const minInitialChars = params.minInitialChars; const chatId = params.chatId; + const requestedPreviewTransport = params.previewTransport ?? "auto"; + const prefersDraftTransport = + requestedPreviewTransport === "draft" + ? true + : requestedPreviewTransport === "message" + ? false + : params.thread?.scope === "dm"; const threadParams = buildTelegramThreadParams(params.thread); const replyParams = params.replyToMessageId != null ? { ...threadParams, reply_to_message_id: params.replyToMessageId } : threadParams; + const resolvedDraftApi = prefersDraftTransport + ? resolveSendMessageDraftApi(params.api) + : undefined; + const usesDraftTransport = Boolean(prefersDraftTransport && resolvedDraftApi); + if (prefersDraftTransport && !usesDraftTransport) { + params.warn?.( + "telegram stream preview: sendMessageDraft unavailable; falling back to sendMessage/editMessageText", + ); + } const streamState = { stopped: false, final: false }; let streamMessageId: number | undefined; + let streamDraftId = usesDraftTransport ? allocateTelegramDraftId() : undefined; let lastSentText = ""; let lastSentParseMode: "HTML" | undefined; + let previewRevision = 0; let generation = 0; + const sendStreamPreview = usesDraftTransport + ? async ({ + renderedText, + renderedParseMode, + }: { + renderedText: string; + renderedParseMode: "HTML" | undefined; + sendGeneration: number; + }): Promise => { + const draftId = streamDraftId ?? allocateTelegramDraftId(); + streamDraftId = draftId; + const draftParams = { + ...(threadParams?.message_thread_id != null + ? { message_thread_id: threadParams.message_thread_id } + : {}), + ...(renderedParseMode ? { parse_mode: renderedParseMode } : {}), + }; + await resolvedDraftApi!( + chatId, + draftId, + renderedText, + Object.keys(draftParams).length > 0 ? draftParams : undefined, + ); + return true; + } + : async ({ + renderedText, + renderedParseMode, + sendGeneration, + }: { + renderedText: string; + renderedParseMode: "HTML" | undefined; + sendGeneration: number; + }): Promise => { + if (typeof streamMessageId === "number") { + if (renderedParseMode) { + await params.api.editMessageText(chatId, streamMessageId, renderedText, { + parse_mode: renderedParseMode, + }); + } else { + await params.api.editMessageText(chatId, streamMessageId, renderedText); + } + return true; + } + const sendParams = renderedParseMode + ? { + ...replyParams, + parse_mode: renderedParseMode, + } + : replyParams; + let sent; + try { + sent = await params.api.sendMessage(chatId, renderedText, sendParams); + } catch (err) { + const hasThreadParam = + "message_thread_id" in (sendParams ?? {}) && + typeof (sendParams as { message_thread_id?: unknown }).message_thread_id === "number"; + if (!hasThreadParam || !THREAD_NOT_FOUND_RE.test(String(err))) { + throw err; + } + const threadlessParams = { + ...(sendParams as Record), + }; + delete threadlessParams.message_thread_id; + params.warn?.( + "telegram stream preview send failed with message_thread_id, retrying without thread", + ); + sent = await params.api.sendMessage( + chatId, + renderedText, + Object.keys(threadlessParams).length > 0 ? threadlessParams : undefined, + ); + } + const sentMessageId = sent?.message_id; + if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) { + streamState.stopped = true; + params.warn?.("telegram stream preview stopped (missing message id from sendMessage)"); + return false; + } + const normalizedMessageId = Math.trunc(sentMessageId); + if (sendGeneration !== generation) { + params.onSupersededPreview?.({ + messageId: normalizedMessageId, + textSnapshot: renderedText, + parseMode: renderedParseMode, + }); + return true; + } + streamMessageId = normalizedMessageId; + return true; + }; const sendOrEditStreamMessage = async (text: string): Promise => { // Allow final flush even if stopped (e.g., after clear()). @@ -100,40 +240,15 @@ export function createTelegramDraftStream(params: { lastSentText = renderedText; lastSentParseMode = renderedParseMode; try { - if (typeof streamMessageId === "number") { - if (renderedParseMode) { - await params.api.editMessageText(chatId, streamMessageId, renderedText, { - parse_mode: renderedParseMode, - }); - } else { - await params.api.editMessageText(chatId, streamMessageId, renderedText); - } - return true; + const sent = await sendStreamPreview({ + renderedText, + renderedParseMode, + sendGeneration, + }); + if (sent) { + previewRevision += 1; } - const sendParams = renderedParseMode - ? { - ...replyParams, - parse_mode: renderedParseMode, - } - : replyParams; - const sent = await params.api.sendMessage(chatId, renderedText, sendParams); - const sentMessageId = sent?.message_id; - if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) { - streamState.stopped = true; - params.warn?.("telegram stream preview stopped (missing message id from sendMessage)"); - return false; - } - const normalizedMessageId = Math.trunc(sentMessageId); - if (sendGeneration !== generation) { - params.onSupersededPreview?.({ - messageId: normalizedMessageId, - textSnapshot: renderedText, - parseMode: renderedParseMode, - }); - return true; - } - streamMessageId = normalizedMessageId; - return true; + return sent; } catch (err) { streamState.stopped = true; params.warn?.( @@ -166,6 +281,9 @@ export function createTelegramDraftStream(params: { const forceNewMessage = () => { generation += 1; streamMessageId = undefined; + if (usesDraftTransport) { + streamDraftId = allocateTelegramDraftId(); + } lastSentText = ""; lastSentParseMode = undefined; loop.resetPending(); @@ -178,6 +296,8 @@ export function createTelegramDraftStream(params: { update, flush: loop.flush, messageId: () => streamMessageId, + previewMode: () => (usesDraftTransport ? "draft" : "message"), + previewRevision: () => previewRevision, clear, stop, forceNewMessage, diff --git a/src/telegram/lane-delivery.ts b/src/telegram/lane-delivery.ts index 8e2bce0cc..b334c6ded 100644 --- a/src/telegram/lane-delivery.ts +++ b/src/telegram/lane-delivery.ts @@ -103,6 +103,7 @@ type ConsumeArchivedAnswerPreviewParams = { export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText; + const isDraftPreviewLane = (lane: DraftLaneState) => lane.stream?.previewMode?.() === "draft"; const shouldSkipRegressivePreviewUpdate = (args: { currentPreviewText: string | undefined; @@ -337,6 +338,24 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { } if (allowPreviewUpdateForNonFinal && canEditViaPreview) { + if (isDraftPreviewLane(lane)) { + // DM draft flow has no message_id to edit; updates are sent via sendMessageDraft. + // Only mark as updated when the draft flush actually emits an update. + const previewRevisionBeforeFlush = lane.stream?.previewRevision?.() ?? 0; + lane.stream?.update(text); + await params.flushDraftLane(lane); + const previewUpdated = (lane.stream?.previewRevision?.() ?? 0) > previewRevisionBeforeFlush; + if (!previewUpdated) { + params.log( + `telegram: ${laneName} draft preview update not emitted; falling back to standard send`, + ); + const delivered = await params.sendPayload(params.applyTextToPayload(payload, text)); + return delivered ? "sent" : "skipped"; + } + lane.lastPartialText = text; + params.markDelivered(); + return "preview-updated"; + } const updated = await tryUpdatePreviewForLane({ lane, laneName,