ACP: make live mode flush incremental chunks

This commit is contained in:
Onur
2026-03-01 15:59:20 +01:00
committed by Onur Solmaz
parent 43c57005a6
commit dd2fcade3e
4 changed files with 123 additions and 3 deletions

View File

@@ -1,4 +1,4 @@
import { describe, expect, it } from "vitest";
import { describe, expect, it, vi } from "vitest";
import { prefixSystemMessage } from "../../infra/system-message.js";
import { createAcpReplyProjector } from "./acp-projector.js";
import { createAcpTestConfig as createCfg } from "./test-fixtures/acp-runtime.js";
@@ -28,6 +28,81 @@ describe("createAcpReplyProjector", () => {
]);
});
it("flushes staggered live text deltas after idle gaps", async () => {
vi.useFakeTimers();
try {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg({
acp: {
enabled: true,
stream: {
deliveryMode: "live",
coalesceIdleMs: 50,
maxChunkChars: 64,
},
},
}),
shouldSendToolSummaries: true,
deliver: async (kind, payload) => {
deliveries.push({ kind, text: payload.text });
return true;
},
});
await projector.onEvent({ type: "text_delta", text: "A", tag: "agent_message_chunk" });
await vi.advanceTimersByTimeAsync(60);
await projector.flush(false);
await projector.onEvent({ type: "text_delta", text: "B", tag: "agent_message_chunk" });
await vi.advanceTimersByTimeAsync(60);
await projector.flush(false);
await projector.onEvent({ type: "text_delta", text: "C", tag: "agent_message_chunk" });
await vi.advanceTimersByTimeAsync(60);
await projector.flush(false);
expect(deliveries.filter((entry) => entry.kind === "block")).toEqual([
{ kind: "block", text: "A" },
{ kind: "block", text: "B" },
{ kind: "block", text: "C" },
]);
} finally {
vi.useRealTimers();
}
});
it("splits oversized live text by maxChunkChars", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg({
acp: {
enabled: true,
stream: {
deliveryMode: "live",
coalesceIdleMs: 0,
maxChunkChars: 50,
},
},
}),
shouldSendToolSummaries: true,
deliver: async (kind, payload) => {
deliveries.push({ kind, text: payload.text });
return true;
},
});
const text = `${"a".repeat(50)}${"b".repeat(50)}${"c".repeat(20)}`;
await projector.onEvent({ type: "text_delta", text, tag: "agent_message_chunk" });
await projector.flush(true);
expect(deliveries.filter((entry) => entry.kind === "block")).toEqual([
{ kind: "block", text: "a".repeat(50) },
{ kind: "block", text: "b".repeat(50) },
{ kind: "block", text: "c".repeat(20) },
]);
});
it("supports deliveryMode=final_only by buffering all projected output until done", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({

View File

@@ -141,6 +141,7 @@ export function createAcpReplyProjector(params: {
cfg: params.cfg,
provider: params.provider,
accountId: params.accountId,
deliveryMode: settings.deliveryMode,
});
const blockReplyPipeline = createBlockReplyPipeline({
onBlockReply: async (payload) => {
@@ -179,8 +180,9 @@ export function createAcpReplyProjector(params: {
if (settings.deliveryMode === "final_only" && !force) {
return;
}
const effectiveForce = settings.deliveryMode === "live" ? true : force;
chunker.drain({
force,
force: effectiveForce,
emit: (chunk) => {
blockReplyPipeline.enqueue({ text: chunk });
},

View File

@@ -89,4 +89,27 @@ describe("acp stream settings", () => {
expect(streaming.chunking.maxChars).toBe(64);
expect(streaming.coalescing.idleMs).toBe(0);
});
it("applies live-mode streaming overrides for incremental delivery", () => {
const streaming = resolveAcpStreamingConfig({
cfg: createAcpTestConfig({
acp: {
enabled: true,
stream: {
deliveryMode: "live",
coalesceIdleMs: 350,
maxChunkChars: 256,
},
},
}),
provider: "discord",
deliveryMode: "live",
});
expect(streaming.chunking.minChars).toBe(1);
expect(streaming.chunking.maxChars).toBe(256);
expect(streaming.coalescing.minChars).toBe(1);
expect(streaming.coalescing.maxChars).toBe(256);
expect(streaming.coalescing.joiner).toBe("");
expect(streaming.coalescing.idleMs).toBe(350);
});
});

View File

@@ -131,14 +131,34 @@ export function resolveAcpStreamingConfig(params: {
cfg: OpenClawConfig;
provider?: string;
accountId?: string;
deliveryMode?: AcpDeliveryMode;
}) {
return resolveEffectiveBlockStreamingConfig({
const resolved = resolveEffectiveBlockStreamingConfig({
cfg: params.cfg,
provider: params.provider,
accountId: params.accountId,
maxChunkChars: resolveAcpStreamMaxChunkChars(params.cfg),
coalesceIdleMs: resolveAcpStreamCoalesceIdleMs(params.cfg),
});
// In live mode, ACP text deltas should flush promptly and never be held
// behind large generic min-char thresholds.
if (params.deliveryMode === "live") {
return {
chunking: {
...resolved.chunking,
minChars: 1,
},
coalescing: {
...resolved.coalescing,
minChars: 1,
// ACP delta streams already carry spacing/newlines; preserve exact text.
joiner: "",
},
};
}
return resolved;
}
export function isAcpTagVisible(