fix(gateway): flush chat delta before tool-start events (#39128)
Co-authored-by: john <john.j@min123.net>
This commit is contained in:
@@ -264,6 +264,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Agents/OpenAI WS compat store flag: omit `store` from `response.create` payloads when model compat sets `supportsStore: false`, preventing strict OpenAI-compatible providers from rejecting websocket requests with unknown-field errors. (#39113) Thanks @scoootscooob.
|
||||
- Config/validation log sanitization: sanitize config-validation issue paths/messages before logging so control characters and ANSI escape sequences cannot inject misleading terminal output from crafted config content. (#39116) Thanks @powermaster888.
|
||||
- Agents/compaction counter accuracy: count successful overflow-triggered auto-compactions (`willRetry=true`) in the compaction counter while still excluding aborted/no-result events, so `/status` reflects actual safeguard compaction activity. (#39123) Thanks @MumuTW.
|
||||
- Gateway/chat delta ordering: flush buffered assistant deltas before emitting tool `start` events so pre-tool text is delivered to Control UI before tool cards, avoiding transient text/tool ordering artifacts in streaming. (#39128) Thanks @0xtangping.
|
||||
|
||||
## 2026.3.2
|
||||
|
||||
|
||||
@@ -470,6 +470,74 @@ describe("agent event handler", () => {
|
||||
nowSpy?.mockRestore();
|
||||
});
|
||||
|
||||
it("flushes buffered chat delta before tool start events", () => {
|
||||
let now = 12_000;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
|
||||
const {
|
||||
broadcast,
|
||||
broadcastToConnIds,
|
||||
nodeSendToSession,
|
||||
chatRunState,
|
||||
toolEventRecipients,
|
||||
handler,
|
||||
} = createHarness({
|
||||
resolveSessionKeyForRun: () => "session-tool-flush",
|
||||
});
|
||||
|
||||
chatRunState.registry.add("run-tool-flush", {
|
||||
sessionKey: "session-tool-flush",
|
||||
clientRunId: "client-tool-flush",
|
||||
});
|
||||
registerAgentRunContext("run-tool-flush", {
|
||||
sessionKey: "session-tool-flush",
|
||||
verboseLevel: "off",
|
||||
});
|
||||
toolEventRecipients.add("run-tool-flush", "conn-1");
|
||||
|
||||
handler({
|
||||
runId: "run-tool-flush",
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Before tool" },
|
||||
});
|
||||
|
||||
// Throttled assistant update (within 150ms window).
|
||||
now = 12_050;
|
||||
handler({
|
||||
runId: "run-tool-flush",
|
||||
seq: 2,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Before tool expanded" },
|
||||
});
|
||||
|
||||
handler({
|
||||
runId: "run-tool-flush",
|
||||
seq: 3,
|
||||
stream: "tool",
|
||||
ts: Date.now(),
|
||||
data: { phase: "start", name: "read", toolCallId: "tool-flush-1" },
|
||||
});
|
||||
|
||||
const chatCalls = chatBroadcastCalls(broadcast);
|
||||
expect(chatCalls).toHaveLength(2);
|
||||
const flushedPayload = chatCalls[1]?.[1] as {
|
||||
state?: string;
|
||||
message?: { content?: Array<{ text?: string }> };
|
||||
};
|
||||
expect(flushedPayload.state).toBe("delta");
|
||||
expect(flushedPayload.message?.content?.[0]?.text).toBe("Before tool expanded");
|
||||
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(2);
|
||||
|
||||
expect(broadcastToConnIds).toHaveBeenCalledTimes(1);
|
||||
const flushCallOrder = broadcast.mock.invocationCallOrder[1] ?? 0;
|
||||
const toolCallOrder = broadcastToConnIds.mock.invocationCallOrder[0] ?? Number.MAX_SAFE_INTEGER;
|
||||
expect(flushCallOrder).toBeLessThan(toolCallOrder);
|
||||
nowSpy.mockRestore();
|
||||
resetAgentRunContextForTest();
|
||||
});
|
||||
|
||||
it("routes tool events only to registered recipients when verbose is enabled", () => {
|
||||
const { broadcast, broadcastToConnIds, toolEventRecipients, handler } = createHarness({
|
||||
resolveSessionKeyForRun: () => "session-1",
|
||||
|
||||
@@ -390,6 +390,60 @@ export function createAgentEventHandler({
|
||||
nodeSendToSession(sessionKey, "chat", payload);
|
||||
};
|
||||
|
||||
const flushBufferedChatDeltaIfNeeded = (
|
||||
sessionKey: string,
|
||||
clientRunId: string,
|
||||
sourceRunId: string,
|
||||
seq: number,
|
||||
) => {
|
||||
const bufferedText = stripInlineDirectiveTagsForDisplay(
|
||||
chatRunState.buffers.get(clientRunId) ?? "",
|
||||
).text.trim();
|
||||
const normalizedHeartbeatText = normalizeHeartbeatChatFinalText({
|
||||
runId: clientRunId,
|
||||
sourceRunId,
|
||||
text: bufferedText,
|
||||
});
|
||||
const text = normalizedHeartbeatText.text.trim();
|
||||
const shouldSuppressSilent =
|
||||
normalizedHeartbeatText.suppress || isSilentReplyText(text, SILENT_REPLY_TOKEN);
|
||||
const shouldSuppressSilentLeadFragment = isSilentReplyLeadFragment(text);
|
||||
const shouldSuppressHeartbeatStreaming = shouldHideHeartbeatChatOutput(
|
||||
clientRunId,
|
||||
sourceRunId,
|
||||
);
|
||||
if (
|
||||
!text ||
|
||||
shouldSuppressSilent ||
|
||||
shouldSuppressSilentLeadFragment ||
|
||||
shouldSuppressHeartbeatStreaming
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
const lastBroadcastLen = chatRunState.deltaLastBroadcastLen.get(clientRunId) ?? 0;
|
||||
if (text.length <= lastBroadcastLen) {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const flushPayload = {
|
||||
runId: clientRunId,
|
||||
sessionKey,
|
||||
seq,
|
||||
state: "delta" as const,
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text }],
|
||||
timestamp: now,
|
||||
},
|
||||
};
|
||||
broadcast("chat", flushPayload, { dropIfSlow: true });
|
||||
nodeSendToSession(sessionKey, "chat", flushPayload);
|
||||
chatRunState.deltaLastBroadcastLen.set(clientRunId, text.length);
|
||||
chatRunState.deltaSentAt.set(clientRunId, now);
|
||||
};
|
||||
|
||||
const emitChatFinal = (
|
||||
sessionKey: string,
|
||||
clientRunId: string,
|
||||
@@ -410,38 +464,11 @@ export function createAgentEventHandler({
|
||||
const text = normalizedHeartbeatText.text.trim();
|
||||
const shouldSuppressSilent =
|
||||
normalizedHeartbeatText.suppress || isSilentReplyText(text, SILENT_REPLY_TOKEN);
|
||||
const shouldSuppressSilentLeadFragment = isSilentReplyLeadFragment(text);
|
||||
const shouldSuppressHeartbeatStreaming = shouldHideHeartbeatChatOutput(
|
||||
clientRunId,
|
||||
sourceRunId,
|
||||
);
|
||||
// Flush any throttled delta so streaming clients receive the complete text
|
||||
// before the final event. The 150 ms throttle in emitChatDelta may have
|
||||
// before the final event. The 150 ms throttle in emitChatDelta may have
|
||||
// suppressed the most recent chunk, leaving the client with stale text.
|
||||
// Only flush if the buffer has grown since the last broadcast to avoid duplicates.
|
||||
if (
|
||||
text &&
|
||||
!shouldSuppressSilent &&
|
||||
!shouldSuppressSilentLeadFragment &&
|
||||
!shouldSuppressHeartbeatStreaming
|
||||
) {
|
||||
const lastBroadcastLen = chatRunState.deltaLastBroadcastLen.get(clientRunId) ?? 0;
|
||||
if (text.length > lastBroadcastLen) {
|
||||
const flushPayload = {
|
||||
runId: clientRunId,
|
||||
sessionKey,
|
||||
seq,
|
||||
state: "delta" as const,
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text }],
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
};
|
||||
broadcast("chat", flushPayload, { dropIfSlow: true });
|
||||
nodeSendToSession(sessionKey, "chat", flushPayload);
|
||||
}
|
||||
}
|
||||
flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, sourceRunId, seq);
|
||||
chatRunState.deltaLastBroadcastLen.delete(clientRunId);
|
||||
chatRunState.buffers.delete(clientRunId);
|
||||
chatRunState.deltaSentAt.delete(clientRunId);
|
||||
@@ -542,6 +569,12 @@ export function createAgentEventHandler({
|
||||
}
|
||||
agentRunSeq.set(evt.runId, evt.seq);
|
||||
if (isToolEvent) {
|
||||
const toolPhase = typeof evt.data?.phase === "string" ? evt.data.phase : "";
|
||||
// Flush pending assistant text before tool-start events so clients can
|
||||
// render complete pre-tool text above tool cards (not truncated by delta throttle).
|
||||
if (toolPhase === "start" && isControlUiVisible && sessionKey && !isAborted) {
|
||||
flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, evt.runId, evt.seq);
|
||||
}
|
||||
// Always broadcast tool events to registered WS recipients with
|
||||
// tool-events capability, regardless of verboseLevel. The verbose
|
||||
// setting only controls whether tool details are sent as channel
|
||||
|
||||
Reference in New Issue
Block a user