From 3368fcf31ec0a819b3e08dc3c7bb14174be8dbf1 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 3 Jan 2026 02:16:01 +0100 Subject: [PATCH] fix: avoid duplicate replies with block streaming --- CHANGELOG.md | 1 + src/auto-reply/reply.block-streaming.test.ts | 103 +++++++++++++++++++ src/auto-reply/reply.ts | 9 +- 3 files changed, 110 insertions(+), 3 deletions(-) create mode 100644 src/auto-reply/reply.block-streaming.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 62e95ed92..087ebf6af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ ### Fixes - Chat UI: keep the chat scrolled to the latest message after switching sessions. - Auto-reply: stream completed reply blocks as soon as they finish (configurable default + break); skip empty tool-only blocks unless verbose. +- Discord: avoid duplicate sends when block streaming is enabled (race with typing hook). - Providers: make outbound text chunk limits configurable via `*.textChunkLimit` (defaults remain 4000/Discord 2000). - CLI onboarding: persist gateway token in config so local CLI auth works; recommend auth Off unless you need multi-machine access. - Control UI: accept a `?token=` URL param to auto-fill Gateway auth; onboarding now opens the dashboard with token auth when configured. diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts new file mode 100644 index 000000000..ee9796748 --- /dev/null +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -0,0 +1,103 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { loadModelCatalog } from "../agents/model-catalog.js"; +import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; +import { getReplyFromConfig } from "./reply.js"; + +vi.mock("../agents/pi-embedded.js", () => ({ + abortEmbeddedPiRun: vi.fn().mockReturnValue(false), + runEmbeddedPiAgent: vi.fn(), + queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), + resolveEmbeddedSessionLane: (key: string) => + `session:${key.trim() || "main"}`, +})); +vi.mock("../agents/model-catalog.js", () => ({ + loadModelCatalog: vi.fn(), +})); + +async function withTempHome(fn: (home: string) => Promise): Promise { + const base = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-stream-")); + const previousHome = process.env.HOME; + process.env.HOME = base; + try { + return await fn(base); + } finally { + process.env.HOME = previousHome; + await fs.rm(base, { recursive: true, force: true }); + } +} + +describe("block streaming", () => { + beforeEach(() => { + vi.mocked(runEmbeddedPiAgent).mockReset(); + vi.mocked(loadModelCatalog).mockResolvedValue([ + { id: "claude-opus-4-5", name: "Opus 4.5", provider: "anthropic" }, + { id: "gpt-4.1-mini", name: "GPT-4.1 Mini", provider: "openai" }, + ]); + }); + + async function waitForCalls(fn: () => number, calls: number) { + const deadline = Date.now() + 1500; + while (fn() < calls) { + if (Date.now() > deadline) { + throw new Error(`Expected ${calls} call(s), got ${fn()}`); + } + await new Promise((resolve) => setTimeout(resolve, 5)); + } + } + + it("waits for block replies before returning final payloads", async () => { + await withTempHome(async (home) => { + let releaseTyping: (() => void) | undefined; + const typingGate = new Promise((resolve) => { + releaseTyping = resolve; + }); + const onReplyStart = vi.fn(() => typingGate); + const onBlockReply = vi.fn().mockResolvedValue(undefined); + + vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { + void params.onBlockReply?.({ text: "hello" }); + return { + payloads: [{ text: "hello" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }; + }); + + const replyPromise = getReplyFromConfig( + { + Body: "ping", + From: "+1004", + To: "+2000", + MessageSid: "msg-123", + Surface: "discord", + }, + { + onReplyStart, + onBlockReply, + }, + { + agent: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + whatsapp: { allowFrom: ["*"] }, + session: { store: path.join(home, "sessions.json") }, + }, + ); + + await waitForCalls(() => onReplyStart.mock.calls.length, 1); + releaseTyping?.(); + + const res = await replyPromise; + expect(res).toBeUndefined(); + expect(onBlockReply).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 6fa60d455..6a1428bea 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -1421,16 +1421,19 @@ export async function getReplyFromConfig( const hasMedia = (payload.mediaUrls?.length ?? 0) > 0; if (!cleaned && !hasMedia) return; if (cleaned?.trim() === SILENT_REPLY_TOKEN && !hasMedia) return; - await startTypingOnText(cleaned); const blockPayload: ReplyPayload = { text: cleaned, mediaUrls: payload.mediaUrls, mediaUrl: payload.mediaUrls?.[0], replyToId: tagResult.replyToId, }; - const task = Promise.resolve(opts.onBlockReply?.(blockPayload)) + const payloadKey = buildPayloadKey(blockPayload); + const task = (async () => { + await startTypingOnText(cleaned); + await opts.onBlockReply?.(blockPayload); + })() .then(() => { - streamedPayloadKeys.add(buildPayloadKey(blockPayload)); + streamedPayloadKeys.add(payloadKey); }) .catch((err) => { logVerbose(`block reply delivery failed: ${String(err)}`);