From b5c2b1880d8ccb50d1c61950bf6c2a6ec333c219 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Sat, 31 Jan 2026 22:29:00 +0530 Subject: [PATCH] fix: stabilize partial streaming filters --- ...pi-embedded-subscribe.handlers.messages.ts | 85 ++++++++++++------- .../pi-embedded-subscribe.handlers.types.ts | 1 + src/agents/pi-embedded-subscribe.ts | 4 + 3 files changed, 61 insertions(+), 29 deletions(-) diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 2b1a6df28..bbaa3276d 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -5,6 +5,7 @@ import { isMessagingToolDuplicateNormalized, normalizeTextForComparison, } from "./pi-embedded-helpers.js"; +import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js"; import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js"; import { appendRawStream } from "./pi-embedded-subscribe.raw-stream.js"; import { @@ -17,6 +18,18 @@ import { } from "./pi-embedded-utils.js"; import { createInlineCodeState } from "../markdown/code-spans.js"; +const stripTrailingDirective = (text: string): string => { + const openIndex = text.lastIndexOf("[["); + if (openIndex < 0) { + return text; + } + const closeIndex = text.indexOf("]]", openIndex + 2); + if (closeIndex >= 0) { + return text; + } + return text.slice(0, openIndex); +}; + export function handleMessageStart( ctx: EmbeddedPiSubscribeContext, evt: AgentEvent & { message: AgentMessage }, @@ -109,40 +122,54 @@ export function handleMessageUpdate( inlineCode: createInlineCodeState(), }) .trim(); - if (next && next !== ctx.state.lastStreamedAssistant) { - const parsedDelta = chunk ? ctx.consumePartialReplyDirectives(chunk) : null; - const deltaText = parsedDelta?.text ?? ""; + if (next) { + const visibleDelta = chunk ? ctx.stripBlockTags(chunk, ctx.state.partialBlockState) : ""; + const parsedDelta = visibleDelta ? ctx.consumePartialReplyDirectives(visibleDelta) : null; + const parsedFull = parseReplyDirectives(stripTrailingDirective(next)); + const cleanedText = parsedFull.text; const mediaUrls = parsedDelta?.mediaUrls; - if (!deltaText && (!mediaUrls || mediaUrls.length === 0) && !parsedDelta?.audioAsVoice) { - ctx.state.lastStreamedAssistant = next; - return; - } + const hasMedia = Boolean(mediaUrls && mediaUrls.length > 0); + const hasAudio = Boolean(parsedDelta?.audioAsVoice); const previousCleaned = ctx.state.lastStreamedAssistantCleaned ?? ""; - const cleanedText = `${previousCleaned}${deltaText}`; + + let shouldEmit = false; + let deltaText = ""; + if (!cleanedText && !hasMedia && !hasAudio) { + shouldEmit = false; + } else if (previousCleaned && !cleanedText.startsWith(previousCleaned)) { + shouldEmit = false; + } else { + deltaText = cleanedText.slice(previousCleaned.length); + shouldEmit = Boolean(deltaText || hasMedia || hasAudio); + } + ctx.state.lastStreamedAssistant = next; ctx.state.lastStreamedAssistantCleaned = cleanedText; - emitAgentEvent({ - runId: ctx.params.runId, - stream: "assistant", - data: { - text: cleanedText, - delta: deltaText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }, - }); - void ctx.params.onAgentEvent?.({ - stream: "assistant", - data: { - text: cleanedText, - delta: deltaText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }, - }); - if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) { - void ctx.params.onPartialReply({ - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + + if (shouldEmit) { + emitAgentEvent({ + runId: ctx.params.runId, + stream: "assistant", + data: { + text: cleanedText, + delta: deltaText, + mediaUrls: hasMedia ? mediaUrls : undefined, + }, }); + void ctx.params.onAgentEvent?.({ + stream: "assistant", + data: { + text: cleanedText, + delta: deltaText, + mediaUrls: hasMedia ? mediaUrls : undefined, + }, + }); + if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) { + void ctx.params.onPartialReply({ + text: cleanedText, + mediaUrls: hasMedia ? mediaUrls : undefined, + }); + } } } diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index 92869fda3..e7029845e 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -37,6 +37,7 @@ export type EmbeddedPiSubscribeState = { deltaBuffer: string; blockBuffer: string; blockState: { thinking: boolean; final: boolean; inlineCode: InlineCodeState }; + partialBlockState: { thinking: boolean; final: boolean; inlineCode: InlineCodeState }; lastStreamedAssistant?: string; lastStreamedAssistantCleaned?: string; lastStreamedReasoning?: string; diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 5d04f8f8e..f74164fa3 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -46,6 +46,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar blockBuffer: "", // Track if a streamed chunk opened a block (stateful across chunks). blockState: { thinking: false, final: false, inlineCode: createInlineCodeState() }, + partialBlockState: { thinking: false, final: false, inlineCode: createInlineCodeState() }, lastStreamedAssistant: undefined, lastStreamedAssistantCleaned: undefined, lastStreamedReasoning: undefined, @@ -89,6 +90,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.blockState.thinking = false; state.blockState.final = false; state.blockState.inlineCode = createInlineCodeState(); + state.partialBlockState.thinking = false; + state.partialBlockState.final = false; + state.partialBlockState.inlineCode = createInlineCodeState(); state.lastStreamedAssistant = undefined; state.lastStreamedAssistantCleaned = undefined; state.lastBlockReplyText = undefined;