ACP: make final_only defer all projected output

This commit is contained in:
Onur
2026-03-01 13:33:57 +01:00
committed by Onur Solmaz
parent 4a82012461
commit c3a1fe01ae
6 changed files with 151 additions and 20 deletions

View File

@@ -28,7 +28,7 @@ describe("createAcpReplyProjector", () => {
]);
});
it("supports deliveryMode=final_only by buffering deltas until done", async () => {
it("supports deliveryMode=final_only by buffering all projected output until done", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg({
@@ -38,6 +38,9 @@ describe("createAcpReplyProjector", () => {
coalesceIdleMs: 0,
maxChunkChars: 512,
deliveryMode: "final_only",
tagVisibility: {
available_commands_update: true,
},
},
},
}),
@@ -53,6 +56,19 @@ describe("createAcpReplyProjector", () => {
text: "What",
tag: "agent_message_chunk",
});
await projector.onEvent({
type: "status",
text: "available commands updated (7)",
tag: "available_commands_update",
});
await projector.onEvent({
type: "tool_call",
tag: "tool_call",
toolCallId: "call_1",
status: "in_progress",
title: "List files",
text: "List files (in_progress)",
});
await projector.onEvent({
type: "text_delta",
text: " now?",
@@ -61,7 +77,62 @@ describe("createAcpReplyProjector", () => {
expect(deliveries).toEqual([]);
await projector.onEvent({ type: "done" });
expect(deliveries).toEqual([{ kind: "block", text: "What now?" }]);
expect(deliveries).toHaveLength(3);
expect(deliveries[0]).toEqual({
kind: "tool",
text: prefixSystemMessage("available commands updated (7)"),
});
expect(deliveries[1]?.kind).toBe("tool");
expect(deliveries[1]?.text).toContain("Tool Call");
expect(deliveries[2]).toEqual({ kind: "block", text: "What now?" });
});
it("flushes buffered status/tool output on error in deliveryMode=final_only", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 512,
deliveryMode: "final_only",
tagVisibility: {
available_commands_update: true,
},
},
},
}),
shouldSendToolSummaries: true,
deliver: async (kind, payload) => {
deliveries.push({ kind, text: payload.text });
return true;
},
});
await projector.onEvent({
type: "status",
text: "available commands updated (7)",
tag: "available_commands_update",
});
await projector.onEvent({
type: "tool_call",
tag: "tool_call",
toolCallId: "call_2",
status: "in_progress",
title: "Run tests",
text: "Run tests (in_progress)",
});
expect(deliveries).toEqual([]);
await projector.onEvent({ type: "error", message: "turn failed" });
expect(deliveries).toHaveLength(2);
expect(deliveries[0]).toEqual({
kind: "tool",
text: prefixSystemMessage("available commands updated (7)"),
});
expect(deliveries[1]?.kind).toBe("tool");
expect(deliveries[1]?.text).toContain("Tool Call");
});
it("suppresses usage_update by default and allows deduped usage when tag-visible", async () => {
@@ -91,6 +162,7 @@ describe("createAcpReplyProjector", () => {
stream: {
coalesceIdleMs: 0,
maxChunkChars: 64,
deliveryMode: "live",
tagVisibility: {
usage_update: true,
},
@@ -142,7 +214,6 @@ describe("createAcpReplyProjector", () => {
return true;
},
});
await projector.onEvent({
type: "status",
text: "available commands updated (7)",
@@ -155,7 +226,14 @@ describe("createAcpReplyProjector", () => {
it("dedupes repeated tool lifecycle updates when repeatSuppression is enabled", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg(),
cfg: createCfg({
acp: {
enabled: true,
stream: {
deliveryMode: "live",
},
},
}),
shouldSendToolSummaries: true,
deliver: async (kind, payload) => {
deliveries.push({ kind, text: payload.text });
@@ -206,7 +284,14 @@ describe("createAcpReplyProjector", () => {
it("renders fallback tool labels without leaking call ids as primary label", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg(),
cfg: createCfg({
acp: {
enabled: true,
stream: {
deliveryMode: "live",
},
},
}),
shouldSendToolSummaries: true,
deliver: async (kind, payload) => {
deliveries.push({ kind, text: payload.text });
@@ -235,6 +320,7 @@ describe("createAcpReplyProjector", () => {
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
repeatSuppression: false,
tagVisibility: {
available_commands_update: true,
@@ -303,6 +389,7 @@ describe("createAcpReplyProjector", () => {
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
tagVisibility: {
available_commands_update: true,
},
@@ -388,6 +475,7 @@ describe("createAcpReplyProjector", () => {
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
maxMetaEventsPerTurn: 1,
tagVisibility: {
usage_update: true,
@@ -438,6 +526,7 @@ describe("createAcpReplyProjector", () => {
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
tagVisibility: {
tool_call_update: false,
},

View File

@@ -29,6 +29,11 @@ type ToolLifecycleState = {
lastRenderedHash?: string;
};
type BufferedToolDelivery = {
payload: ReplyPayload;
meta?: AcpProjectedDeliveryMeta;
};
function truncateText(input: string, maxChars: number): string {
if (input.length <= maxChars) {
return input;
@@ -109,6 +114,7 @@ export function createAcpReplyProjector(params: {
let lastStatusHash: string | undefined;
let lastToolHash: string | undefined;
let lastUsageTuple: string | undefined;
const pendingToolDeliveries: BufferedToolDelivery[] = [];
const toolLifecycleById = new Map<string, ToolLifecycleState>();
const resetTurnState = () => {
@@ -118,6 +124,7 @@ export function createAcpReplyProjector(params: {
lastStatusHash = undefined;
lastToolHash = undefined;
lastUsageTuple = undefined;
pendingToolDeliveries.length = 0;
toolLifecycleById.clear();
};
@@ -133,7 +140,17 @@ export function createAcpReplyProjector(params: {
});
};
const flushBufferedToolDeliveries = async (force: boolean) => {
if (!(settings.deliveryMode === "final_only" && force)) {
return;
}
for (const entry of pendingToolDeliveries.splice(0, pendingToolDeliveries.length)) {
await params.deliver("tool", entry.payload, entry.meta);
}
};
const flush = async (force = false): Promise<void> => {
await flushBufferedToolDeliveries(force);
drainChunker(force);
await blockReplyPipeline.flush({ force });
};
@@ -170,10 +187,15 @@ export function createAcpReplyProjector(params: {
if (!consumeMetaQuota(opts?.force === true)) {
return;
}
if (settings.deliveryMode === "live") {
if (settings.deliveryMode === "final_only") {
pendingToolDeliveries.push({
payload: { text: formatted },
meta,
});
} else {
await flush(true);
await params.deliver("tool", { text: formatted }, meta);
}
await params.deliver("tool", { text: formatted }, meta);
lastStatusHash = hash;
};
@@ -226,19 +248,21 @@ export function createAcpReplyProjector(params: {
if (!consumeMetaQuota(opts?.force === true)) {
return;
}
if (settings.deliveryMode === "live") {
const deliveryMeta: AcpProjectedDeliveryMeta = {
...(event.tag ? { tag: event.tag } : {}),
...(toolCallId ? { toolCallId } : {}),
...(status ? { toolStatus: status } : {}),
allowEdit: Boolean(toolCallId && event.tag === "tool_call_update"),
};
if (settings.deliveryMode === "final_only") {
pendingToolDeliveries.push({
payload: { text: toolSummary },
meta: deliveryMeta,
});
} else {
await flush(true);
await params.deliver("tool", { text: toolSummary }, deliveryMeta);
}
await params.deliver(
"tool",
{ text: toolSummary },
{
...(event.tag ? { tag: event.tag } : {}),
...(toolCallId ? { toolCallId } : {}),
...(status ? { toolStatus: status } : {}),
allowEdit: Boolean(toolCallId && event.tag === "tool_call_update"),
},
);
lastToolHash = hash;
};

View File

@@ -39,6 +39,20 @@ describe("acp stream settings", () => {
expect(settings.tagVisibility.usage_update).toBe(true);
});
it("accepts explicit deliveryMode=live override", () => {
const settings = resolveAcpProjectionSettings(
createAcpTestConfig({
acp: {
enabled: true,
stream: {
deliveryMode: "live",
},
},
}),
);
expect(settings.deliveryMode).toBe("live");
});
it("uses default tag visibility when no override is provided", () => {
const settings = resolveAcpProjectionSettings(createAcpTestConfig());
expect(isAcpTagVisible(settings, "tool_call")).toBe(true);

View File

@@ -59,7 +59,10 @@ function clampBoolean(value: unknown, fallback: boolean): boolean {
}
function resolveAcpDeliveryMode(value: unknown): AcpDeliveryMode {
return value === "final_only" ? "final_only" : DEFAULT_ACP_DELIVERY_MODE;
if (value === "live" || value === "final_only") {
return value;
}
return DEFAULT_ACP_DELIVERY_MODE;
}
function resolveAcpStreamCoalesceIdleMs(cfg: OpenClawConfig): number {

View File

@@ -175,7 +175,7 @@ export const FIELD_HELP: Record<string, string> = {
"acp.stream.repeatSuppression":
"When true (default), suppress repeated ACP status/tool projection lines in a turn while keeping raw ACP events unchanged.",
"acp.stream.deliveryMode":
"ACP delivery style: live streams block chunks incrementally, final_only buffers text deltas until terminal turn events.",
"ACP delivery style: live streams projected output incrementally, final_only buffers all projected ACP output until terminal turn events.",
"acp.stream.maxTurnChars":
"Maximum assistant text characters projected per ACP turn before truncation notice is emitted.",
"acp.stream.maxToolSummaryChars":

View File

@@ -60,6 +60,7 @@ Default should be minimal-noise out of the box:
- `repeatSuppression=false`:
- forward repeated status/tool updates as they arrive.
3. Keep existing text streaming path and existing guardrails (`maxTurnChars`, meta caps).
- In `deliveryMode=final_only`, defer all projected ACP output (text + meta/tool) until terminal turn events.
4. Keep canonical formatting:
- system lines via `prefixSystemMessage(...)`
- tool lines via shared tool formatter path.