diff --git a/CHANGELOG.md b/CHANGELOG.md index de76fbbcd..a92aba676 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ Docs: https://docs.openclaw.ai - Discord: prevent duplicate media delivery when the model uses the `message send` tool with media, by skipping media extraction from messaging tool results since the tool already sent the message directly. (#18270) - Telegram: keep draft-stream preview replies attached to the user message for `replyToMode: "all"` in groups and DMs, preserving threaded reply context from preview through finalization. (#17880) Thanks @yinghaosang. - Telegram: prevent streaming final replies from being overwritten by later final/error payloads, and suppress fallback tool-error warnings when a recovered assistant answer already exists after tool calls. (#17883) Thanks @Marvae and @obviyus. +- Telegram: debounce the first draft-stream preview update (30-char threshold) and finalize short responses by editing the stop-time preview message, improving first push notifications and avoiding duplicate final sends. (#18148) Thanks @Marvae. - Telegram: disable block streaming when `channels.telegram.streamMode` is `off`, preventing newline/content-block replies from splitting into multiple messages. (#17679) Thanks @saivarunk. - Telegram: route non-abort slash commands on the normal chat/topic sequential lane while keeping true abort requests (`/stop`, `stop`) on the control lane, preventing command/reply race conditions from control-lane bypass. (#17899) Thanks @obviyus. - Telegram: ignore `` placeholder lines when extracting `MEDIA:` tool-result paths, preventing false local-file reads and dropped replies. (#18510) Thanks @yinghaosang. diff --git a/src/channels/draft-stream-loop.ts b/src/channels/draft-stream-loop.ts index ed492ff40..69f16c46d 100644 --- a/src/channels/draft-stream-loop.ts +++ b/src/channels/draft-stream-loop.ts @@ -9,11 +9,11 @@ export type DraftStreamLoop = { export function createDraftStreamLoop(params: { throttleMs: number; isStopped: () => boolean; - sendOrEditStreamMessage: (text: string) => Promise; + sendOrEditStreamMessage: (text: string) => Promise; }): DraftStreamLoop { let lastSentAt = 0; let pendingText = ""; - let inFlightPromise: Promise | undefined; + let inFlightPromise: Promise | undefined; let timer: ReturnType | undefined; const flush = async () => { @@ -32,14 +32,18 @@ export function createDraftStreamLoop(params: { return; } pendingText = ""; - lastSentAt = Date.now(); const current = params.sendOrEditStreamMessage(text).finally(() => { if (inFlightPromise === current) { inFlightPromise = undefined; } }); inFlightPromise = current; - await current; + const sent = await current; + if (sent === false) { + pendingText = text; + return; + } + lastSentAt = Date.now(); if (!pendingText) { return; } diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index 080587a76..b079ffcec 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -1,5 +1,5 @@ -import path from "node:path"; import type { Bot } from "grammy"; +import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { STATE_DIR } from "../config/paths.js"; @@ -47,7 +47,7 @@ describe("dispatchTelegramMessage draft streaming", () => { flush: vi.fn().mockResolvedValue(undefined), messageId: vi.fn().mockReturnValue(messageId), clear: vi.fn().mockResolvedValue(undefined), - stop: vi.fn(), + stop: vi.fn().mockResolvedValue(undefined), forceNewMessage: vi.fn(), }; } @@ -216,6 +216,33 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftStream.stop).toHaveBeenCalled(); }); + it("edits the preview message created during stop() final flush", async () => { + let messageId: number | undefined; + const draftStream = { + update: vi.fn(), + flush: vi.fn().mockResolvedValue(undefined), + messageId: vi.fn().mockImplementation(() => messageId), + clear: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockImplementation(async () => { + messageId = 777; + }), + forceNewMessage: vi.fn(), + }; + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "Short final" }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "777" }); + + await dispatchWithContext({ context: createContext() }); + + expect(editMessageTelegram).toHaveBeenCalledWith(123, 777, "Short final", expect.any(Object)); + expect(deliverReplies).not.toHaveBeenCalled(); + expect(draftStream.stop).toHaveBeenCalled(); + }); + it("does not overwrite finalized preview when additional final payloads are sent", async () => { const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 6157b7544..df86f9ac3 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -1,4 +1,10 @@ import type { Bot } from "grammy"; +import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js"; +import type { RuntimeEnv } from "../runtime.js"; +import type { TelegramMessageContext } from "./bot-message-context.js"; +import type { TelegramBotOptions } from "./bot.js"; +import type { TelegramStreamMode } from "./bot/types.js"; +import type { TelegramInlineButtons } from "./button-types.js"; import { resolveAgentDir } from "../agents/agent-scope.js"; import { findModelInCatalog, @@ -15,15 +21,9 @@ import { logAckFailure, logTypingFailure } from "../channels/logging.js"; import { createReplyPrefixOptions } from "../channels/reply-prefix.js"; import { createTypingCallbacks } from "../channels/typing.js"; import { resolveMarkdownTableMode } from "../config/markdown-tables.js"; -import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js"; import { danger, logVerbose } from "../globals.js"; import { getAgentScopedMediaLocalRoots } from "../media/local-roots.js"; -import type { RuntimeEnv } from "../runtime.js"; -import type { TelegramMessageContext } from "./bot-message-context.js"; -import type { TelegramBotOptions } from "./bot.js"; import { deliverReplies } from "./bot/delivery.js"; -import type { TelegramStreamMode } from "./bot/types.js"; -import type { TelegramInlineButtons } from "./button-types.js"; import { resolveTelegramDraftStreamingChunking } from "./draft-chunking.js"; import { createTelegramDraftStream } from "./draft-stream.js"; import { editMessageTelegram } from "./send.js"; @@ -31,6 +31,9 @@ import { cacheSticker, describeStickerImage } from "./sticker-cache.js"; const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again."; +/** Minimum chars before sending first streaming message (improves push notification UX) */ +const DRAFT_MIN_INITIAL_CHARS = 30; + async function resolveStickerVisionSupport(cfg: OpenClawConfig, agentId: string) { try { const catalog = await loadModelCatalog({ config: cfg }); @@ -101,6 +104,7 @@ export const dispatchTelegramMessage = async ({ maxChars: draftMaxChars, thread: threadSpec, replyToMessageId: draftReplyToMessageId, + minInitialChars: DRAFT_MIN_INITIAL_CHARS, log: logVerbose, warn: logVerbose, }) @@ -314,7 +318,7 @@ export const dispatchTelegramMessage = async ({ finalText.length <= draftMaxChars && !payload.isError; if (canFinalizeViaPreviewEdit) { - draftStream?.stop(); + await draftStream?.stop(); draftStoppedForPreviewEdit = true; if ( currentPreviewText && @@ -353,7 +357,36 @@ export const dispatchTelegramMessage = async ({ ); } if (!draftStoppedForPreviewEdit) { - draftStream?.stop(); + await draftStream?.stop(); + } + // Check if stop() sent a message (debounce released on isFinal) + // If so, edit that message instead of sending a new one + const messageIdAfterStop = draftStream?.messageId(); + if ( + !finalizedViaPreviewMessage && + typeof messageIdAfterStop === "number" && + typeof finalText === "string" && + finalText.length > 0 && + finalText.length <= draftMaxChars && + !hasMedia && + !payload.isError + ) { + try { + await editMessageTelegram(chatId, messageIdAfterStop, finalText, { + api: bot.api, + cfg, + accountId: route.accountId, + linkPreview: telegramCfg.linkPreview, + buttons: previewButtons, + }); + finalizedViaPreviewMessage = true; + deliveryState.delivered = true; + return; + } catch (err) { + logVerbose( + `telegram: post-stop preview edit failed; falling back to standard send (${String(err)})`, + ); + } } } const result = await deliverReplies({ @@ -421,10 +454,11 @@ export const dispatchTelegramMessage = async ({ }, })); } finally { + // Must stop() first to flush debounced content before clear() wipes state + await draftStream?.stop(); if (!finalizedViaPreviewMessage) { await draftStream?.clear(); } - draftStream?.stop(); } let sentFallback = false; if (!deliveryState.delivered && deliveryState.skippedNonSilent > 0) { diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts index 9f1f2a7f8..d01a6c29c 100644 --- a/src/telegram/draft-stream.test.ts +++ b/src/telegram/draft-stream.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { createTelegramDraftStream } from "./draft-stream.js"; function createMockDraftApi(sendMessageImpl?: () => Promise<{ message_id: number }>) { @@ -134,3 +134,142 @@ describe("createTelegramDraftStream", () => { expect(api.sendMessage).toHaveBeenLastCalledWith(123, "After thinking", undefined); }); }); + +describe("draft stream initial message debounce", () => { + const createMockApi = () => ({ + sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }), + editMessageText: vi.fn().mockResolvedValue(true), + deleteMessage: vi.fn().mockResolvedValue(true), + }); + + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + describe("isFinal has highest priority", () => { + it("sends immediately on stop() even with 1 character", async () => { + const api = createMockApi(); + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + minInitialChars: 30, + }); + + stream.update("Y"); + await stream.stop(); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalledWith(123, "Y", undefined); + }); + + it("sends immediately on stop() with short sentence", async () => { + const api = createMockApi(); + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + minInitialChars: 30, + }); + + stream.update("Ok."); + await stream.stop(); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalledWith(123, "Ok.", undefined); + }); + }); + + describe("minInitialChars threshold", () => { + it("does not send first message below threshold", async () => { + const api = createMockApi(); + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + minInitialChars: 30, + }); + + stream.update("Processing"); // 10 chars, below 30 + await stream.flush(); + + expect(api.sendMessage).not.toHaveBeenCalled(); + }); + + it("sends first message when reaching threshold", async () => { + const api = createMockApi(); + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + minInitialChars: 30, + }); + + // Exactly 30 chars + stream.update("I am processing your request.."); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalled(); + }); + + it("works with longer text above threshold", async () => { + const api = createMockApi(); + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + minInitialChars: 30, + }); + + stream.update("I am processing your request, please wait a moment"); // 50 chars + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalled(); + }); + }); + + describe("subsequent updates after first message", () => { + it("edits normally after first message is sent", async () => { + const api = createMockApi(); + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + minInitialChars: 30, + }); + + // First message at threshold (30 chars) + stream.update("I am processing your request.."); + await stream.flush(); + expect(api.sendMessage).toHaveBeenCalledTimes(1); + + // Subsequent updates should edit, not wait for threshold + stream.update("I am processing your request.. and summarizing"); + await stream.flush(); + + expect(api.editMessageText).toHaveBeenCalled(); + expect(api.sendMessage).toHaveBeenCalledTimes(1); // still only 1 send + }); + }); + + describe("default behavior without debounce params", () => { + it("sends immediately without minInitialChars set (backward compatible)", async () => { + const api = createMockApi(); + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + // no minInitialChars (backward-compatible behavior) + }); + + stream.update("Hi"); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalledWith(123, "Hi", undefined); + }); + }); +}); diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index 1682413eb..1d8d8e81f 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -10,7 +10,7 @@ export type TelegramDraftStream = { flush: () => Promise; messageId: () => number | undefined; clear: () => Promise; - stop: () => void; + stop: () => Promise; /** Reset internal state so the next update creates a new message instead of editing. */ forceNewMessage: () => void; }; @@ -22,6 +22,8 @@ export function createTelegramDraftStream(params: { thread?: TelegramThreadSpec | null; replyToMessageId?: number; throttleMs?: number; + /** Minimum chars before sending first message (debounce for push notifications) */ + minInitialChars?: number; log?: (message: string) => void; warn?: (message: string) => void; }): TelegramDraftStream { @@ -30,6 +32,7 @@ export function createTelegramDraftStream(params: { TELEGRAM_STREAM_MAX_CHARS, ); const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS); + const minInitialChars = params.minInitialChars; const chatId = params.chatId; const threadParams = buildTelegramThreadParams(params.thread); const replyParams = @@ -40,14 +43,16 @@ export function createTelegramDraftStream(params: { let streamMessageId: number | undefined; let lastSentText = ""; let stopped = false; + let isFinal = false; - const sendOrEditStreamMessage = async (text: string) => { - if (stopped) { - return; + const sendOrEditStreamMessage = async (text: string): Promise => { + // Allow final flush even if stopped (e.g., after clear()). + if (stopped && !isFinal) { + return false; } const trimmed = text.trimEnd(); if (!trimmed) { - return; + return false; } if (trimmed.length > maxChars) { // Telegram text messages/edits cap at 4096 chars. @@ -56,40 +61,64 @@ export function createTelegramDraftStream(params: { params.warn?.( `telegram stream preview stopped (text length ${trimmed.length} > ${maxChars})`, ); - return; + return false; } if (trimmed === lastSentText) { - return; + return true; } + + // Debounce first preview send for better push notification quality. + if (typeof streamMessageId !== "number" && minInitialChars != null && !isFinal) { + if (trimmed.length < minInitialChars) { + return false; + } + } + lastSentText = trimmed; try { if (typeof streamMessageId === "number") { await params.api.editMessageText(chatId, streamMessageId, trimmed); - return; + return true; } const sent = await params.api.sendMessage(chatId, trimmed, replyParams); const sentMessageId = sent?.message_id; if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) { stopped = true; params.warn?.("telegram stream preview stopped (missing message id from sendMessage)"); - return; + return false; } streamMessageId = Math.trunc(sentMessageId); + return true; } catch (err) { stopped = true; params.warn?.( `telegram stream preview failed: ${err instanceof Error ? err.message : String(err)}`, ); + return false; } }; + const loop = createDraftStreamLoop({ throttleMs, isStopped: () => stopped, sendOrEditStreamMessage, }); + const update = (text: string) => { + if (stopped || isFinal) { + return; + } + loop.update(text); + }; + + const stop = async (): Promise => { + isFinal = true; + await loop.flush(); + }; + const clear = async () => { - stop(); + stopped = true; + loop.stop(); await loop.waitForInFlight(); const messageId = streamMessageId; streamMessageId = undefined; @@ -105,11 +134,6 @@ export function createTelegramDraftStream(params: { } }; - const stop = () => { - stopped = true; - loop.stop(); - }; - const forceNewMessage = () => { streamMessageId = undefined; lastSentText = ""; @@ -119,7 +143,7 @@ export function createTelegramDraftStream(params: { params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`); return { - update: loop.update, + update, flush: loop.flush, messageId: () => streamMessageId, clear,