From dd2fcade3ef444e4a52c866e3a41adfe346542c3 Mon Sep 17 00:00:00 2001 From: Onur <2453968+osolmaz@users.noreply.github.com> Date: Sun, 1 Mar 2026 15:59:20 +0100 Subject: [PATCH] ACP: make live mode flush incremental chunks --- src/auto-reply/reply/acp-projector.test.ts | 77 ++++++++++++++++++- src/auto-reply/reply/acp-projector.ts | 4 +- .../reply/acp-stream-settings.test.ts | 23 ++++++ src/auto-reply/reply/acp-stream-settings.ts | 22 +++++- 4 files changed, 123 insertions(+), 3 deletions(-) diff --git a/src/auto-reply/reply/acp-projector.test.ts b/src/auto-reply/reply/acp-projector.test.ts index 40bcf84e9..3a5d960fb 100644 --- a/src/auto-reply/reply/acp-projector.test.ts +++ b/src/auto-reply/reply/acp-projector.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { prefixSystemMessage } from "../../infra/system-message.js"; import { createAcpReplyProjector } from "./acp-projector.js"; import { createAcpTestConfig as createCfg } from "./test-fixtures/acp-runtime.js"; @@ -28,6 +28,81 @@ describe("createAcpReplyProjector", () => { ]); }); + it("flushes staggered live text deltas after idle gaps", async () => { + vi.useFakeTimers(); + try { + const deliveries: Array<{ kind: string; text?: string }> = []; + const projector = createAcpReplyProjector({ + cfg: createCfg({ + acp: { + enabled: true, + stream: { + deliveryMode: "live", + coalesceIdleMs: 50, + maxChunkChars: 64, + }, + }, + }), + shouldSendToolSummaries: true, + deliver: async (kind, payload) => { + deliveries.push({ kind, text: payload.text }); + return true; + }, + }); + + await projector.onEvent({ type: "text_delta", text: "A", tag: "agent_message_chunk" }); + await vi.advanceTimersByTimeAsync(60); + await projector.flush(false); + + await projector.onEvent({ type: "text_delta", text: "B", tag: "agent_message_chunk" }); + await vi.advanceTimersByTimeAsync(60); + await projector.flush(false); + + await projector.onEvent({ type: "text_delta", text: "C", tag: "agent_message_chunk" }); + await vi.advanceTimersByTimeAsync(60); + await projector.flush(false); + + expect(deliveries.filter((entry) => entry.kind === "block")).toEqual([ + { kind: "block", text: "A" }, + { kind: "block", text: "B" }, + { kind: "block", text: "C" }, + ]); + } finally { + vi.useRealTimers(); + } + }); + + it("splits oversized live text by maxChunkChars", async () => { + const deliveries: Array<{ kind: string; text?: string }> = []; + const projector = createAcpReplyProjector({ + cfg: createCfg({ + acp: { + enabled: true, + stream: { + deliveryMode: "live", + coalesceIdleMs: 0, + maxChunkChars: 50, + }, + }, + }), + shouldSendToolSummaries: true, + deliver: async (kind, payload) => { + deliveries.push({ kind, text: payload.text }); + return true; + }, + }); + + const text = `${"a".repeat(50)}${"b".repeat(50)}${"c".repeat(20)}`; + await projector.onEvent({ type: "text_delta", text, tag: "agent_message_chunk" }); + await projector.flush(true); + + expect(deliveries.filter((entry) => entry.kind === "block")).toEqual([ + { kind: "block", text: "a".repeat(50) }, + { kind: "block", text: "b".repeat(50) }, + { kind: "block", text: "c".repeat(20) }, + ]); + }); + it("supports deliveryMode=final_only by buffering all projected output until done", async () => { const deliveries: Array<{ kind: string; text?: string }> = []; const projector = createAcpReplyProjector({ diff --git a/src/auto-reply/reply/acp-projector.ts b/src/auto-reply/reply/acp-projector.ts index 320fc976c..18326a0a3 100644 --- a/src/auto-reply/reply/acp-projector.ts +++ b/src/auto-reply/reply/acp-projector.ts @@ -141,6 +141,7 @@ export function createAcpReplyProjector(params: { cfg: params.cfg, provider: params.provider, accountId: params.accountId, + deliveryMode: settings.deliveryMode, }); const blockReplyPipeline = createBlockReplyPipeline({ onBlockReply: async (payload) => { @@ -179,8 +180,9 @@ export function createAcpReplyProjector(params: { if (settings.deliveryMode === "final_only" && !force) { return; } + const effectiveForce = settings.deliveryMode === "live" ? true : force; chunker.drain({ - force, + force: effectiveForce, emit: (chunk) => { blockReplyPipeline.enqueue({ text: chunk }); }, diff --git a/src/auto-reply/reply/acp-stream-settings.test.ts b/src/auto-reply/reply/acp-stream-settings.test.ts index ef35508db..a5ffd1a6d 100644 --- a/src/auto-reply/reply/acp-stream-settings.test.ts +++ b/src/auto-reply/reply/acp-stream-settings.test.ts @@ -89,4 +89,27 @@ describe("acp stream settings", () => { expect(streaming.chunking.maxChars).toBe(64); expect(streaming.coalescing.idleMs).toBe(0); }); + + it("applies live-mode streaming overrides for incremental delivery", () => { + const streaming = resolveAcpStreamingConfig({ + cfg: createAcpTestConfig({ + acp: { + enabled: true, + stream: { + deliveryMode: "live", + coalesceIdleMs: 350, + maxChunkChars: 256, + }, + }, + }), + provider: "discord", + deliveryMode: "live", + }); + expect(streaming.chunking.minChars).toBe(1); + expect(streaming.chunking.maxChars).toBe(256); + expect(streaming.coalescing.minChars).toBe(1); + expect(streaming.coalescing.maxChars).toBe(256); + expect(streaming.coalescing.joiner).toBe(""); + expect(streaming.coalescing.idleMs).toBe(350); + }); }); diff --git a/src/auto-reply/reply/acp-stream-settings.ts b/src/auto-reply/reply/acp-stream-settings.ts index c0d32f1e7..2eae975f8 100644 --- a/src/auto-reply/reply/acp-stream-settings.ts +++ b/src/auto-reply/reply/acp-stream-settings.ts @@ -131,14 +131,34 @@ export function resolveAcpStreamingConfig(params: { cfg: OpenClawConfig; provider?: string; accountId?: string; + deliveryMode?: AcpDeliveryMode; }) { - return resolveEffectiveBlockStreamingConfig({ + const resolved = resolveEffectiveBlockStreamingConfig({ cfg: params.cfg, provider: params.provider, accountId: params.accountId, maxChunkChars: resolveAcpStreamMaxChunkChars(params.cfg), coalesceIdleMs: resolveAcpStreamCoalesceIdleMs(params.cfg), }); + + // In live mode, ACP text deltas should flush promptly and never be held + // behind large generic min-char thresholds. + if (params.deliveryMode === "live") { + return { + chunking: { + ...resolved.chunking, + minChars: 1, + }, + coalescing: { + ...resolved.coalescing, + minChars: 1, + // ACP delta streams already carry spacing/newlines; preserve exact text. + joiner: "", + }, + }; + } + + return resolved; } export function isAcpTagVisible(