diff --git a/CHANGELOG.md b/CHANGELOG.md index e59e1fed5..770f9a8b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -318,6 +318,7 @@ Docs: https://docs.openclaw.ai - Exec approvals/gateway-node policy: honor explicit `ask=off` from `exec-approvals.json` even when runtime defaults are stricter, so trusted full/off setups stop re-prompting on gateway and node exec paths. Landed from contributor PR #26789 by @pandego. Thanks @pandego. - Exec approvals/config fallback: inherit `ask` from `exec-approvals.json` when `tools.exec.ask` is unset, so local full/off defaults no longer fall back to `on-miss` for exec tool and `nodes run`. Landed from contributor PR #29187 by @Bartok9. Thanks @Bartok9. - Exec approvals/allow-always shell scripts: persist and match script paths for wrapper invocations like `bash scripts/foo.sh` while still blocking `-c`/`-s` wrapper bypasses. Landed from contributor PR #35137 by @yuweuii. Thanks @yuweuii. +- Queue/followup dedupe across drain restarts: dedupe queued redelivery `message_id` values after queue recreation so busy-session followups no longer duplicate on replayed inbound events. Landed from contributor PR #33168 by @rylena. Thanks @rylena. ## 2026.3.2 diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index 3d0ddb371..b097b6c51 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -2,7 +2,11 @@ export { extractQueueDirective } from "./queue/directive.js"; export { clearSessionQueues } from "./queue/cleanup.js"; export type { ClearSessionQueueResult } from "./queue/cleanup.js"; export { scheduleFollowupDrain } from "./queue/drain.js"; -export { enqueueFollowupRun, getFollowupQueueDepth } from "./queue/enqueue.js"; +export { + enqueueFollowupRun, + getFollowupQueueDepth, + resetRecentQueuedMessageIdDedupe, +} from "./queue/enqueue.js"; export { resolveQueueSettings } from "./queue/settings.js"; export { clearFollowupQueue } from "./queue/state.js"; export type { diff --git a/src/auto-reply/reply/queue/enqueue.ts b/src/auto-reply/reply/queue/enqueue.ts index 1d5849237..d6481d2e9 100644 --- a/src/auto-reply/reply/queue/enqueue.ts +++ b/src/auto-reply/reply/queue/enqueue.ts @@ -1,8 +1,28 @@ +import { createDedupeCache } from "../../../infra/dedupe.js"; import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js"; import { kickFollowupDrainIfIdle } from "./drain.js"; import { getExistingFollowupQueue, getFollowupQueue } from "./state.js"; import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js"; +const RECENT_QUEUE_MESSAGE_IDS = createDedupeCache({ + ttlMs: 5 * 60 * 1000, + maxSize: 10_000, +}); + +function buildRecentMessageIdKey(run: FollowupRun, queueKey: string): string | undefined { + const messageId = run.messageId?.trim(); + if (!messageId) { + return undefined; + } + const route = [ + run.originatingChannel ?? "", + run.originatingTo ?? "", + run.originatingAccountId ?? "", + run.originatingThreadId == null ? "" : String(run.originatingThreadId), + ].join("|"); + return `${queueKey}|${route}|${messageId}`; +} + function isRunAlreadyQueued( run: FollowupRun, items: FollowupRun[], @@ -31,6 +51,11 @@ export function enqueueFollowupRun( dedupeMode: QueueDedupeMode = "message-id", ): boolean { const queue = getFollowupQueue(key, settings); + const recentMessageIdKey = dedupeMode !== "none" ? buildRecentMessageIdKey(run, key) : undefined; + if (recentMessageIdKey && RECENT_QUEUE_MESSAGE_IDS.peek(recentMessageIdKey)) { + return false; + } + const dedupe = dedupeMode === "none" ? undefined @@ -54,6 +79,9 @@ export function enqueueFollowupRun( } queue.items.push(run); + if (recentMessageIdKey) { + RECENT_QUEUE_MESSAGE_IDS.check(recentMessageIdKey); + } // If drain finished and deleted the queue before this item arrived, a new queue // object was created (draining: false) but nobody scheduled a drain for it. // Use the cached callback to restart the drain now. @@ -70,3 +98,7 @@ export function getFollowupQueueDepth(key: string): number { } return queue.items.length; } + +export function resetRecentQueuedMessageIdDedupe(): void { + RECENT_QUEUE_MESSAGE_IDS.clear(); +} diff --git a/src/auto-reply/reply/reply-flow.test.ts b/src/auto-reply/reply/reply-flow.test.ts index 2842924b2..93ff85ce1 100644 --- a/src/auto-reply/reply/reply-flow.test.ts +++ b/src/auto-reply/reply/reply-flow.test.ts @@ -1,4 +1,4 @@ -import { afterAll, beforeAll, describe, expect, it, vi } from "vitest"; +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { expectInboundContextContract } from "../../../test/helpers/inbound-contract.js"; import type { OpenClawConfig } from "../../config/config.js"; import { defaultRuntime } from "../../runtime.js"; @@ -8,7 +8,11 @@ import { finalizeInboundContext } from "./inbound-context.js"; import { normalizeInboundTextNewlines } from "./inbound-text.js"; import { parseLineDirectives, hasLineDirectives } from "./line-directives.js"; import type { FollowupRun, QueueSettings } from "./queue.js"; -import { enqueueFollowupRun, scheduleFollowupDrain } from "./queue.js"; +import { + enqueueFollowupRun, + resetRecentQueuedMessageIdDedupe, + scheduleFollowupDrain, +} from "./queue.js"; import { createReplyDispatcher } from "./reply-dispatcher.js"; import { createReplyToModeFilter, resolveReplyToMode } from "./reply-threading.js"; @@ -627,6 +631,10 @@ function createRun(params: { } describe("followup queue deduplication", () => { + beforeEach(() => { + resetRecentQueuedMessageIdDedupe(); + }); + it("deduplicates messages with same Discord message_id", async () => { const key = `test-dedup-message-id-${Date.now()}`; const calls: FollowupRun[] = []; @@ -690,6 +698,51 @@ describe("followup queue deduplication", () => { expect(calls[0]?.prompt).toContain("[Queued messages while agent was busy]"); }); + it("deduplicates same message_id after queue drain restarts", async () => { + const key = `test-dedup-after-drain-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + done.resolve(); + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + const first = enqueueFollowupRun( + key, + createRun({ + prompt: "first", + messageId: "same-id", + originatingChannel: "signal", + originatingTo: "+10000000000", + }), + settings, + ); + expect(first).toBe(true); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + + const redelivery = enqueueFollowupRun( + key, + createRun({ + prompt: "first-redelivery", + messageId: "same-id", + originatingChannel: "signal", + originatingTo: "+10000000000", + }), + settings, + ); + + expect(redelivery).toBe(false); + expect(calls).toHaveLength(1); + }); + it("deduplicates exact prompt when routing matches and no message id", async () => { const key = `test-dedup-whatsapp-${Date.now()}`; const settings: QueueSettings = {