From c64bcd047b1395fffdea6da22a0e6f6ac481c309 Mon Sep 17 00:00:00 2001 From: The Admiral Date: Sun, 11 Jan 2026 21:19:50 -0500 Subject: [PATCH 1/2] fix: flush block reply coalescer on tool boundaries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When block streaming is enabled with verbose=off, tool blocks are hidden but their boundary information was lost. Text segments before and after tool execution would get coalesced into a single message because the coalescer had no signal that a tool had executed between them. This adds an onBlockReplyFlush callback that fires on tool_execution_start, allowing the block reply pipeline to flush pending text before the tool runs. This preserves natural message boundaries even when tools are hidden. Fixes the issue where: text → [hidden tool] → text → rendered as one merged message Now correctly renders as: text → [hidden tool] → text → two separate messages Co-diagnosed-by: Krill (Discord assistant) --- src/agents/pi-embedded-runner.ts | 3 + src/agents/pi-embedded-subscribe.test.ts | 92 ++++++++++++++++++++++++ src/agents/pi-embedded-subscribe.ts | 7 ++ src/auto-reply/reply/agent-runner.ts | 6 ++ 4 files changed, 108 insertions(+) diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index f8acdd185..83678ac4e 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -1313,6 +1313,8 @@ export async function runEmbeddedPiAgent(params: { mediaUrls?: string[]; audioAsVoice?: boolean; }) => void | Promise; + /** Flush pending block replies (e.g., before tool execution to preserve message boundaries). */ + onBlockReplyFlush?: () => void | Promise; blockReplyBreak?: "text_end" | "message_end"; blockReplyChunking?: BlockReplyChunking; onReasoningStream?: (payload: { @@ -1669,6 +1671,7 @@ export async function runEmbeddedPiAgent(params: { onToolResult: params.onToolResult, onReasoningStream: params.onReasoningStream, onBlockReply: params.onBlockReply, + onBlockReplyFlush: params.onBlockReplyFlush, blockReplyBreak: params.blockReplyBreak, blockReplyChunking: params.blockReplyChunking, onPartialReply: params.onPartialReply, diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index d07691e90..974faea4d 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -1754,4 +1754,96 @@ describe("subscribeEmbeddedPiSession", () => { expect(onToolResult).toHaveBeenCalledTimes(1); }); + + it("calls onBlockReplyFlush before tool_execution_start to preserve message boundaries", () => { + let handler: SessionEventHandler | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReplyFlush = vi.fn(); + const onBlockReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run-flush-test", + onBlockReply, + onBlockReplyFlush, + blockReplyBreak: "text_end", + }); + + // Simulate text arriving before tool + handler?.({ + type: "message_start", + message: { role: "assistant" }, + }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: "First message before tool.", + }, + }); + + expect(onBlockReplyFlush).not.toHaveBeenCalled(); + + // Tool execution starts - should trigger flush + handler?.({ + type: "tool_execution_start", + toolName: "bash", + toolCallId: "tool-flush-1", + args: { command: "echo hello" }, + }); + + expect(onBlockReplyFlush).toHaveBeenCalledTimes(1); + + // Another tool - should flush again + handler?.({ + type: "tool_execution_start", + toolName: "read", + toolCallId: "tool-flush-2", + args: { path: "/tmp/test.txt" }, + }); + + expect(onBlockReplyFlush).toHaveBeenCalledTimes(2); + }); + + it("does not call onBlockReplyFlush when callback is not provided", () => { + let handler: SessionEventHandler | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + + // No onBlockReplyFlush provided + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run-no-flush", + onBlockReply, + blockReplyBreak: "text_end", + }); + + // This should not throw even without onBlockReplyFlush + expect(() => { + handler?.({ + type: "tool_execution_start", + toolName: "bash", + toolCallId: "tool-no-flush", + args: { command: "echo test" }, + }); + }).not.toThrow(); + }); }); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 9d3d325e9..f69209dd0 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -198,6 +198,8 @@ export function subscribeEmbeddedPiSession(params: { mediaUrls?: string[]; audioAsVoice?: boolean; }) => void | Promise; + /** Flush pending block replies (e.g., before tool execution to preserve message boundaries). */ + onBlockReplyFlush?: () => void | Promise; blockReplyBreak?: "text_end" | "message_end"; blockReplyChunking?: BlockReplyChunking; onPartialReply?: (payload: { @@ -483,6 +485,11 @@ export function subscribeEmbeddedPiSession(params: { } if (evt.type === "tool_execution_start") { + // Flush pending block replies to preserve message boundaries before tool execution + if (params.onBlockReplyFlush) { + void params.onBlockReplyFlush(); + } + const toolName = String( (evt as AgentEvent & { toolName: string }).toolName, ); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 6b3229286..23f7f8986 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -643,6 +643,12 @@ export async function runReplyAgent(params: { blockReplyPipeline?.enqueue(blockPayload); } : undefined, + onBlockReplyFlush: + blockStreamingEnabled && blockReplyPipeline + ? async () => { + await blockReplyPipeline.flush({ force: true }); + } + : undefined, shouldEmitToolResult, onToolResult: opts?.onToolResult ? (payload) => { From 1fa7a587d6332572e5811bd93027ef81dc2bde45 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 12 Jan 2026 02:54:57 +0000 Subject: [PATCH 2/2] fix: flush block reply buffers on tool boundaries (#750) (thanks @sebslight) --- CHANGELOG.md | 1 + src/agents/pi-embedded-subscribe.test.ts | 54 ++++++++++++++++++++++++ src/agents/pi-embedded-subscribe.ts | 16 ++++++- 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5930eac9c..f1a6b23ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ - Plugins: treat `plugins.load.paths` directory entries as package roots when they contain `package.json` + `clawdbot.extensions`. - Config: expand `~` in `CLAWDBOT_CONFIG_PATH` and common path-like config fields (including `plugins.load.paths`). - Auto-reply: align `/think` default display with model reasoning defaults. (#751) — thanks @gabriel-trigo. +- Auto-reply: flush block reply buffers on tool boundaries. (#750) — thanks @sebslight. - Docker: tolerate unset optional env vars in docker-setup.sh under strict mode. (#725) — thanks @petradonka. - CLI/Update: preserve base environment when passing overrides to update subprocesses. (#713) — thanks @danielz1z. - Agents: treat message tool errors as failures so fallback replies still send; require `to` + `message` for `action=send`. (#717) — thanks @theglove44. diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index 974faea4d..2b8afa6fb 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -1815,6 +1815,60 @@ describe("subscribeEmbeddedPiSession", () => { expect(onBlockReplyFlush).toHaveBeenCalledTimes(2); }); + it("flushes buffered block chunks before tool execution", () => { + let handler: SessionEventHandler | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + const onBlockReplyFlush = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run-flush-buffer", + onBlockReply, + onBlockReplyFlush, + blockReplyBreak: "text_end", + blockReplyChunking: { minChars: 50, maxChars: 200 }, + }); + + handler?.({ + type: "message_start", + message: { role: "assistant" }, + }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: "Short chunk.", + }, + }); + + expect(onBlockReply).not.toHaveBeenCalled(); + + handler?.({ + type: "tool_execution_start", + toolName: "bash", + toolCallId: "tool-flush-buffer-1", + args: { command: "echo flush" }, + }); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Short chunk."); + expect(onBlockReplyFlush).toHaveBeenCalledTimes(1); + expect(onBlockReply.mock.invocationCallOrder[0]).toBeLessThan( + onBlockReplyFlush.mock.invocationCallOrder[0], + ); + }); + it("does not call onBlockReplyFlush when callback is not provided", () => { let handler: SessionEventHandler | undefined; const session: StubSession = { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index f69209dd0..9b9b04f68 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -446,6 +446,19 @@ export function subscribeEmbeddedPiSession(params: { }); }; + const flushBlockReplyBuffer = () => { + if (!params.onBlockReply) return; + if (blockChunker?.hasBuffered()) { + blockChunker.drain({ force: true, emit: emitBlockChunk }); + blockChunker.reset(); + return; + } + if (blockBuffer.length > 0) { + emitBlockChunk(blockBuffer); + blockBuffer = ""; + } + }; + const emitReasoningStream = (text: string) => { if (!streamReasoning || !params.onReasoningStream) return; const formatted = formatReasoningMessage(text); @@ -485,7 +498,8 @@ export function subscribeEmbeddedPiSession(params: { } if (evt.type === "tool_execution_start") { - // Flush pending block replies to preserve message boundaries before tool execution + // Flush pending block replies to preserve message boundaries before tool execution. + flushBlockReplyBuffer(); if (params.onBlockReplyFlush) { void params.onBlockReplyFlush(); }