From c3a1fe01ae29d499fc283d26df8d7c34ef2bbc7f Mon Sep 17 00:00:00 2001 From: Onur <2453968+osolmaz@users.noreply.github.com> Date: Sun, 1 Mar 2026 13:33:57 +0100 Subject: [PATCH] ACP: make final_only defer all projected output --- src/auto-reply/reply/acp-projector.test.ts | 99 ++++++++++++++++++- src/auto-reply/reply/acp-projector.ts | 50 +++++++--- .../reply/acp-stream-settings.test.ts | 14 +++ src/auto-reply/reply/acp-stream-settings.ts | 5 +- src/config/schema.help.ts | 2 +- ...nt-dedup-implementation-plan-2026-03-01.md | 1 + 6 files changed, 151 insertions(+), 20 deletions(-) diff --git a/src/auto-reply/reply/acp-projector.test.ts b/src/auto-reply/reply/acp-projector.test.ts index 0ebee2ccb..755122e51 100644 --- a/src/auto-reply/reply/acp-projector.test.ts +++ b/src/auto-reply/reply/acp-projector.test.ts @@ -28,7 +28,7 @@ describe("createAcpReplyProjector", () => { ]); }); - it("supports deliveryMode=final_only by buffering deltas until done", async () => { + it("supports deliveryMode=final_only by buffering all projected output until done", async () => { const deliveries: Array<{ kind: string; text?: string }> = []; const projector = createAcpReplyProjector({ cfg: createCfg({ @@ -38,6 +38,9 @@ describe("createAcpReplyProjector", () => { coalesceIdleMs: 0, maxChunkChars: 512, deliveryMode: "final_only", + tagVisibility: { + available_commands_update: true, + }, }, }, }), @@ -53,6 +56,19 @@ describe("createAcpReplyProjector", () => { text: "What", tag: "agent_message_chunk", }); + await projector.onEvent({ + type: "status", + text: "available commands updated (7)", + tag: "available_commands_update", + }); + await projector.onEvent({ + type: "tool_call", + tag: "tool_call", + toolCallId: "call_1", + status: "in_progress", + title: "List files", + text: "List files (in_progress)", + }); await projector.onEvent({ type: "text_delta", text: " now?", @@ -61,7 +77,62 @@ describe("createAcpReplyProjector", () => { expect(deliveries).toEqual([]); await projector.onEvent({ type: "done" }); - expect(deliveries).toEqual([{ kind: "block", text: "What now?" }]); + expect(deliveries).toHaveLength(3); + expect(deliveries[0]).toEqual({ + kind: "tool", + text: prefixSystemMessage("available commands updated (7)"), + }); + expect(deliveries[1]?.kind).toBe("tool"); + expect(deliveries[1]?.text).toContain("Tool Call"); + expect(deliveries[2]).toEqual({ kind: "block", text: "What now?" }); + }); + + it("flushes buffered status/tool output on error in deliveryMode=final_only", async () => { + const deliveries: Array<{ kind: string; text?: string }> = []; + const projector = createAcpReplyProjector({ + cfg: createCfg({ + acp: { + enabled: true, + stream: { + coalesceIdleMs: 0, + maxChunkChars: 512, + deliveryMode: "final_only", + tagVisibility: { + available_commands_update: true, + }, + }, + }, + }), + shouldSendToolSummaries: true, + deliver: async (kind, payload) => { + deliveries.push({ kind, text: payload.text }); + return true; + }, + }); + + await projector.onEvent({ + type: "status", + text: "available commands updated (7)", + tag: "available_commands_update", + }); + await projector.onEvent({ + type: "tool_call", + tag: "tool_call", + toolCallId: "call_2", + status: "in_progress", + title: "Run tests", + text: "Run tests (in_progress)", + }); + expect(deliveries).toEqual([]); + + await projector.onEvent({ type: "error", message: "turn failed" }); + expect(deliveries).toHaveLength(2); + expect(deliveries[0]).toEqual({ + kind: "tool", + text: prefixSystemMessage("available commands updated (7)"), + }); + expect(deliveries[1]?.kind).toBe("tool"); + expect(deliveries[1]?.text).toContain("Tool Call"); }); it("suppresses usage_update by default and allows deduped usage when tag-visible", async () => { @@ -91,6 +162,7 @@ describe("createAcpReplyProjector", () => { stream: { coalesceIdleMs: 0, maxChunkChars: 64, + deliveryMode: "live", tagVisibility: { usage_update: true, }, @@ -142,7 +214,6 @@ describe("createAcpReplyProjector", () => { return true; }, }); - await projector.onEvent({ type: "status", text: "available commands updated (7)", @@ -155,7 +226,14 @@ describe("createAcpReplyProjector", () => { it("dedupes repeated tool lifecycle updates when repeatSuppression is enabled", async () => { const deliveries: Array<{ kind: string; text?: string }> = []; const projector = createAcpReplyProjector({ - cfg: createCfg(), + cfg: createCfg({ + acp: { + enabled: true, + stream: { + deliveryMode: "live", + }, + }, + }), shouldSendToolSummaries: true, deliver: async (kind, payload) => { deliveries.push({ kind, text: payload.text }); @@ -206,7 +284,14 @@ describe("createAcpReplyProjector", () => { it("renders fallback tool labels without leaking call ids as primary label", async () => { const deliveries: Array<{ kind: string; text?: string }> = []; const projector = createAcpReplyProjector({ - cfg: createCfg(), + cfg: createCfg({ + acp: { + enabled: true, + stream: { + deliveryMode: "live", + }, + }, + }), shouldSendToolSummaries: true, deliver: async (kind, payload) => { deliveries.push({ kind, text: payload.text }); @@ -235,6 +320,7 @@ describe("createAcpReplyProjector", () => { stream: { coalesceIdleMs: 0, maxChunkChars: 256, + deliveryMode: "live", repeatSuppression: false, tagVisibility: { available_commands_update: true, @@ -303,6 +389,7 @@ describe("createAcpReplyProjector", () => { stream: { coalesceIdleMs: 0, maxChunkChars: 256, + deliveryMode: "live", tagVisibility: { available_commands_update: true, }, @@ -388,6 +475,7 @@ describe("createAcpReplyProjector", () => { stream: { coalesceIdleMs: 0, maxChunkChars: 256, + deliveryMode: "live", maxMetaEventsPerTurn: 1, tagVisibility: { usage_update: true, @@ -438,6 +526,7 @@ describe("createAcpReplyProjector", () => { stream: { coalesceIdleMs: 0, maxChunkChars: 256, + deliveryMode: "live", tagVisibility: { tool_call_update: false, }, diff --git a/src/auto-reply/reply/acp-projector.ts b/src/auto-reply/reply/acp-projector.ts index 14ed1571e..ede78abca 100644 --- a/src/auto-reply/reply/acp-projector.ts +++ b/src/auto-reply/reply/acp-projector.ts @@ -29,6 +29,11 @@ type ToolLifecycleState = { lastRenderedHash?: string; }; +type BufferedToolDelivery = { + payload: ReplyPayload; + meta?: AcpProjectedDeliveryMeta; +}; + function truncateText(input: string, maxChars: number): string { if (input.length <= maxChars) { return input; @@ -109,6 +114,7 @@ export function createAcpReplyProjector(params: { let lastStatusHash: string | undefined; let lastToolHash: string | undefined; let lastUsageTuple: string | undefined; + const pendingToolDeliveries: BufferedToolDelivery[] = []; const toolLifecycleById = new Map(); const resetTurnState = () => { @@ -118,6 +124,7 @@ export function createAcpReplyProjector(params: { lastStatusHash = undefined; lastToolHash = undefined; lastUsageTuple = undefined; + pendingToolDeliveries.length = 0; toolLifecycleById.clear(); }; @@ -133,7 +140,17 @@ export function createAcpReplyProjector(params: { }); }; + const flushBufferedToolDeliveries = async (force: boolean) => { + if (!(settings.deliveryMode === "final_only" && force)) { + return; + } + for (const entry of pendingToolDeliveries.splice(0, pendingToolDeliveries.length)) { + await params.deliver("tool", entry.payload, entry.meta); + } + }; + const flush = async (force = false): Promise => { + await flushBufferedToolDeliveries(force); drainChunker(force); await blockReplyPipeline.flush({ force }); }; @@ -170,10 +187,15 @@ export function createAcpReplyProjector(params: { if (!consumeMetaQuota(opts?.force === true)) { return; } - if (settings.deliveryMode === "live") { + if (settings.deliveryMode === "final_only") { + pendingToolDeliveries.push({ + payload: { text: formatted }, + meta, + }); + } else { await flush(true); + await params.deliver("tool", { text: formatted }, meta); } - await params.deliver("tool", { text: formatted }, meta); lastStatusHash = hash; }; @@ -226,19 +248,21 @@ export function createAcpReplyProjector(params: { if (!consumeMetaQuota(opts?.force === true)) { return; } - if (settings.deliveryMode === "live") { + const deliveryMeta: AcpProjectedDeliveryMeta = { + ...(event.tag ? { tag: event.tag } : {}), + ...(toolCallId ? { toolCallId } : {}), + ...(status ? { toolStatus: status } : {}), + allowEdit: Boolean(toolCallId && event.tag === "tool_call_update"), + }; + if (settings.deliveryMode === "final_only") { + pendingToolDeliveries.push({ + payload: { text: toolSummary }, + meta: deliveryMeta, + }); + } else { await flush(true); + await params.deliver("tool", { text: toolSummary }, deliveryMeta); } - await params.deliver( - "tool", - { text: toolSummary }, - { - ...(event.tag ? { tag: event.tag } : {}), - ...(toolCallId ? { toolCallId } : {}), - ...(status ? { toolStatus: status } : {}), - allowEdit: Boolean(toolCallId && event.tag === "tool_call_update"), - }, - ); lastToolHash = hash; }; diff --git a/src/auto-reply/reply/acp-stream-settings.test.ts b/src/auto-reply/reply/acp-stream-settings.test.ts index fb87da28d..918136d85 100644 --- a/src/auto-reply/reply/acp-stream-settings.test.ts +++ b/src/auto-reply/reply/acp-stream-settings.test.ts @@ -39,6 +39,20 @@ describe("acp stream settings", () => { expect(settings.tagVisibility.usage_update).toBe(true); }); + it("accepts explicit deliveryMode=live override", () => { + const settings = resolveAcpProjectionSettings( + createAcpTestConfig({ + acp: { + enabled: true, + stream: { + deliveryMode: "live", + }, + }, + }), + ); + expect(settings.deliveryMode).toBe("live"); + }); + it("uses default tag visibility when no override is provided", () => { const settings = resolveAcpProjectionSettings(createAcpTestConfig()); expect(isAcpTagVisible(settings, "tool_call")).toBe(true); diff --git a/src/auto-reply/reply/acp-stream-settings.ts b/src/auto-reply/reply/acp-stream-settings.ts index 64843530d..634df888d 100644 --- a/src/auto-reply/reply/acp-stream-settings.ts +++ b/src/auto-reply/reply/acp-stream-settings.ts @@ -59,7 +59,10 @@ function clampBoolean(value: unknown, fallback: boolean): boolean { } function resolveAcpDeliveryMode(value: unknown): AcpDeliveryMode { - return value === "final_only" ? "final_only" : DEFAULT_ACP_DELIVERY_MODE; + if (value === "live" || value === "final_only") { + return value; + } + return DEFAULT_ACP_DELIVERY_MODE; } function resolveAcpStreamCoalesceIdleMs(cfg: OpenClawConfig): number { diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 02f993ae2..44ae8bab3 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -175,7 +175,7 @@ export const FIELD_HELP: Record = { "acp.stream.repeatSuppression": "When true (default), suppress repeated ACP status/tool projection lines in a turn while keeping raw ACP events unchanged.", "acp.stream.deliveryMode": - "ACP delivery style: live streams block chunks incrementally, final_only buffers text deltas until terminal turn events.", + "ACP delivery style: live streams projected output incrementally, final_only buffers all projected ACP output until terminal turn events.", "acp.stream.maxTurnChars": "Maximum assistant text characters projected per ACP turn before truncation notice is emitted.", "acp.stream.maxToolSummaryChars": diff --git a/temp/acp-meta-event-dedup-implementation-plan-2026-03-01.md b/temp/acp-meta-event-dedup-implementation-plan-2026-03-01.md index e249a92cf..479130459 100644 --- a/temp/acp-meta-event-dedup-implementation-plan-2026-03-01.md +++ b/temp/acp-meta-event-dedup-implementation-plan-2026-03-01.md @@ -60,6 +60,7 @@ Default should be minimal-noise out of the box: - `repeatSuppression=false`: - forward repeated status/tool updates as they arrive. 3. Keep existing text streaming path and existing guardrails (`maxTurnChars`, meta caps). + - In `deliveryMode=final_only`, defer all projected ACP output (text + meta/tool) until terminal turn events. 4. Keep canonical formatting: - system lines via `prefixSystemMessage(...)` - tool lines via shared tool formatter path.