diff --git a/CHANGELOG.md b/CHANGELOG.md index 633b1f083..240598821 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ Docs: https://docs.openclaw.ai - Security/Hooks transforms: enforce symlink-safe containment for webhook transform module paths (including `hooks.transformsDir` and `hooks.mappings[].transform.module`) by resolving existing-path ancestors via realpath before import, while preserving in-root symlink support; add regression coverage for both escape and allow cases. This ships in the next npm release. Thanks @aether-ai-agent for reporting. - Telegram/WSL2: disable `autoSelectFamily` by default on WSL2 and memoize WSL2 detection in Telegram network decision logic to avoid repeated sync `/proc/version` probes on fetch/send paths. (#21916) Thanks @MizukiMachine. - Telegram/Network: default Node 22+ DNS result ordering to `ipv4first` for Telegram fetch paths and add `OPENCLAW_TELEGRAM_DNS_RESULT_ORDER`/`channels.telegram.network.dnsResultOrder` overrides to reduce IPv6-path fetch failures. (#5405) Thanks @Glucksberg. +- Telegram/Forward bursts: coalesce forwarded text+media updates through a dedicated forward lane debounce window that works with default inbound debounce config, while keeping forwarded control commands immediate. (#19476) thanks @napetrov. - Telegram/Streaming: preserve archived draft preview mapping after flush and clean superseded reasoning preview bubbles so multi-message preview finals no longer cross-edit or orphan stale messages under send/rotation races. (#23202) Thanks @obviyus. - Telegram/Replies: extract forwarded-origin context from unified reply targets (`reply_to_message` and `external_reply`) so forward+comment metadata is preserved across partial reply shapes. (#9720) thanks @mcaxtr. - Telegram/Polling: persist a safe update-offset watermark bounded by pending updates so crash/restart cannot skip queued lower `update_id` updates after out-of-order completion. (#23284) thanks @frankekn. diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index bb63b2a9d..38d20d2fa 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -36,17 +36,27 @@ export function resolveInboundDebounceMs(params: { type DebounceBuffer = { items: T[]; timeout: ReturnType | null; + debounceMs: number; }; export function createInboundDebouncer(params: { debounceMs: number; buildKey: (item: T) => string | null | undefined; shouldDebounce?: (item: T) => boolean; + resolveDebounceMs?: (item: T) => number | undefined; onFlush: (items: T[]) => Promise; onError?: (err: unknown, items: T[]) => void; }) { const buffers = new Map>(); - const debounceMs = Math.max(0, Math.trunc(params.debounceMs)); + const defaultDebounceMs = Math.max(0, Math.trunc(params.debounceMs)); + + const resolveDebounceMs = (item: T) => { + const resolved = params.resolveDebounceMs?.(item); + if (typeof resolved !== "number" || !Number.isFinite(resolved)) { + return defaultDebounceMs; + } + return Math.max(0, Math.trunc(resolved)); + }; const flushBuffer = async (key: string, buffer: DebounceBuffer) => { buffers.delete(key); @@ -78,12 +88,13 @@ export function createInboundDebouncer(params: { } buffer.timeout = setTimeout(() => { void flushBuffer(key, buffer); - }, debounceMs); + }, buffer.debounceMs); buffer.timeout.unref?.(); }; const enqueue = async (item: T) => { const key = params.buildKey(item); + const debounceMs = resolveDebounceMs(item); const canDebounce = debounceMs > 0 && (params.shouldDebounce?.(item) ?? true); if (!canDebounce || !key) { @@ -97,11 +108,12 @@ export function createInboundDebouncer(params: { const existing = buffers.get(key); if (existing) { existing.items.push(item); + existing.debounceMs = debounceMs; scheduleFlush(key, existing); return; } - const buffer: DebounceBuffer = { items: [item], timeout: null }; + const buffer: DebounceBuffer = { items: [item], timeout: null, debounceMs }; buffers.set(key, buffer); scheduleFlush(key, buffer); }; diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index a36deb4d1..aa64ce255 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -256,6 +256,29 @@ describe("createInboundDebouncer", () => { vi.useRealTimers(); }); + + it("supports per-item debounce windows when default debounce is disabled", async () => { + vi.useFakeTimers(); + const calls: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string; windowMs: number }>({ + debounceMs: 0, + buildKey: (item) => item.key, + resolveDebounceMs: (item) => item.windowMs, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + }, + }); + + await debouncer.enqueue({ key: "forward", id: "1", windowMs: 30 }); + await debouncer.enqueue({ key: "forward", id: "2", windowMs: 30 }); + + expect(calls).toEqual([]); + await vi.advanceTimersByTimeAsync(30); + expect(calls).toEqual([["1", "2"]]); + + vi.useRealTimers(); + }); }); describe("initSessionState BodyStripped", () => { diff --git a/src/telegram/bot-handlers.ts b/src/telegram/bot-handlers.ts index b625eb163..33f94331e 100644 --- a/src/telegram/bot-handlers.ts +++ b/src/telegram/bot-handlers.ts @@ -114,14 +114,33 @@ export const registerTelegramHandlers = ({ let textFragmentProcessing: Promise = Promise.resolve(); const debounceMs = resolveInboundDebounceMs({ cfg, channel: "telegram" }); + const FORWARD_BURST_DEBOUNCE_MS = 80; + type TelegramDebounceLane = "default" | "forward"; type TelegramDebounceEntry = { ctx: TelegramContext; msg: Message; allMedia: TelegramMediaRef[]; storeAllowFrom: string[]; debounceKey: string | null; + debounceLane: TelegramDebounceLane; botUsername?: string; }; + const resolveTelegramDebounceLane = (msg: Message): TelegramDebounceLane => { + const forwardMeta = msg as { + forward_origin?: unknown; + forward_from?: unknown; + forward_from_chat?: unknown; + forward_sender_name?: unknown; + forward_date?: unknown; + }; + return (forwardMeta.forward_origin ?? + forwardMeta.forward_from ?? + forwardMeta.forward_from_chat ?? + forwardMeta.forward_sender_name ?? + forwardMeta.forward_date) + ? "forward" + : "default"; + }; const buildSyntheticTextMessage = (params: { base: Message; text: string; @@ -148,16 +167,19 @@ export const registerTelegramHandlers = ({ }; const inboundDebouncer = createInboundDebouncer({ debounceMs, + resolveDebounceMs: (entry) => + entry.debounceLane === "forward" ? FORWARD_BURST_DEBOUNCE_MS : debounceMs, buildKey: (entry) => entry.debounceKey, shouldDebounce: (entry) => { - if (entry.allMedia.length > 0) { - return false; - } const text = entry.msg.text ?? entry.msg.caption ?? ""; - if (!text.trim()) { + const hasText = text.trim().length > 0; + if (hasText && hasControlCommand(text, cfg, { botUsername: entry.botUsername })) { return false; } - return !hasControlCommand(text, cfg, { botUsername: entry.botUsername }); + if (entry.debounceLane === "forward") { + return true; + } + return entry.allMedia.length === 0 && hasText; }, onFlush: async (entries) => { const last = entries.at(-1); @@ -172,7 +194,8 @@ export const registerTelegramHandlers = ({ .map((entry) => entry.msg.text ?? entry.msg.caption ?? "") .filter(Boolean) .join("\n"); - if (!combinedText.trim()) { + const combinedMedia = entries.flatMap((entry) => entry.allMedia); + if (!combinedText.trim() && combinedMedia.length === 0) { return; } const first = entries[0]; @@ -185,7 +208,7 @@ export const registerTelegramHandlers = ({ const messageIdOverride = last.msg.message_id ? String(last.msg.message_id) : undefined; await processMessage( buildSyntheticContext(baseCtx, syntheticMessage), - [], + combinedMedia, first.storeAllowFrom, messageIdOverride ? { messageIdOverride } : undefined, ); @@ -722,8 +745,9 @@ export const registerTelegramHandlers = ({ const senderId = msg.from?.id ? String(msg.from.id) : ""; const conversationKey = resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId); + const debounceLane = resolveTelegramDebounceLane(msg); const debounceKey = senderId - ? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}` + ? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}:${debounceLane}` : null; await inboundDebouncer.enqueue({ ctx, @@ -731,6 +755,7 @@ export const registerTelegramHandlers = ({ allMedia, storeAllowFrom, debounceKey, + debounceLane, botUsername: ctx.me?.username, }); }; diff --git a/src/telegram/bot.media.downloads-media-file-path-no-file-download.test.ts b/src/telegram/bot.media.downloads-media-file-path-no-file-download.test.ts index 0dda27486..522def1ba 100644 --- a/src/telegram/bot.media.downloads-media-file-path-no-file-download.test.ts +++ b/src/telegram/bot.media.downloads-media-file-path-no-file-download.test.ts @@ -322,6 +322,67 @@ describe("telegram media groups", () => { ); }); +describe("telegram forwarded bursts", () => { + afterEach(() => { + vi.clearAllTimers(); + vi.useRealTimers(); + }); + + const FORWARD_BURST_TEST_TIMEOUT_MS = process.platform === "win32" ? 45_000 : 20_000; + + it( + "coalesces forwarded text + forwarded attachment into a single processing turn with default debounce config", + async () => { + const runtimeError = vi.fn(); + const { handler, replySpy } = await createBotHandlerWithOptions({ runtimeError }); + const fetchSpy = mockTelegramPngDownload(); + + try { + await handler({ + message: { + chat: { id: 42, type: "private" }, + from: { id: 777, is_bot: false, first_name: "N" }, + message_id: 21, + text: "Look at this", + date: 1736380800, + forward_origin: { type: "hidden_user", date: 1736380700, sender_user_name: "A" }, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({}), + }); + + await handler({ + message: { + chat: { id: 42, type: "private" }, + from: { id: 777, is_bot: false, first_name: "N" }, + message_id: 22, + date: 1736380801, + photo: [{ file_id: "fwd_photo_1" }], + forward_origin: { type: "hidden_user", date: 1736380701, sender_user_name: "A" }, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({ file_path: "photos/fwd1.jpg" }), + }); + + await vi.waitFor( + () => { + expect(replySpy).toHaveBeenCalledTimes(1); + }, + { timeout: FORWARD_BURST_TEST_TIMEOUT_MS, interval: 10 }, + ); + + expect(runtimeError).not.toHaveBeenCalled(); + const payload = replySpy.mock.calls[0][0]; + expect(payload.Body).toContain("Look at this"); + expect(payload.MediaPaths).toHaveLength(1); + } finally { + fetchSpy.mockRestore(); + } + }, + FORWARD_BURST_TEST_TIMEOUT_MS, + ); +}); + describe("telegram stickers", () => { const STICKER_TEST_TIMEOUT_MS = process.platform === "win32" ? 30_000 : 20_000;