fix: stabilize partial streaming filters

This commit is contained in:
Ayaan Zaidi
2026-01-31 22:29:00 +05:30
committed by Ayaan Zaidi
parent a64d8d2d66
commit b5c2b1880d
3 changed files with 61 additions and 29 deletions

View File

@@ -5,6 +5,7 @@ import {
isMessagingToolDuplicateNormalized,
normalizeTextForComparison,
} from "./pi-embedded-helpers.js";
import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js";
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
import { appendRawStream } from "./pi-embedded-subscribe.raw-stream.js";
import {
@@ -17,6 +18,18 @@ import {
} from "./pi-embedded-utils.js";
import { createInlineCodeState } from "../markdown/code-spans.js";
const stripTrailingDirective = (text: string): string => {
const openIndex = text.lastIndexOf("[[");
if (openIndex < 0) {
return text;
}
const closeIndex = text.indexOf("]]", openIndex + 2);
if (closeIndex >= 0) {
return text;
}
return text.slice(0, openIndex);
};
export function handleMessageStart(
ctx: EmbeddedPiSubscribeContext,
evt: AgentEvent & { message: AgentMessage },
@@ -109,40 +122,54 @@ export function handleMessageUpdate(
inlineCode: createInlineCodeState(),
})
.trim();
if (next && next !== ctx.state.lastStreamedAssistant) {
const parsedDelta = chunk ? ctx.consumePartialReplyDirectives(chunk) : null;
const deltaText = parsedDelta?.text ?? "";
if (next) {
const visibleDelta = chunk ? ctx.stripBlockTags(chunk, ctx.state.partialBlockState) : "";
const parsedDelta = visibleDelta ? ctx.consumePartialReplyDirectives(visibleDelta) : null;
const parsedFull = parseReplyDirectives(stripTrailingDirective(next));
const cleanedText = parsedFull.text;
const mediaUrls = parsedDelta?.mediaUrls;
if (!deltaText && (!mediaUrls || mediaUrls.length === 0) && !parsedDelta?.audioAsVoice) {
ctx.state.lastStreamedAssistant = next;
return;
}
const hasMedia = Boolean(mediaUrls && mediaUrls.length > 0);
const hasAudio = Boolean(parsedDelta?.audioAsVoice);
const previousCleaned = ctx.state.lastStreamedAssistantCleaned ?? "";
const cleanedText = `${previousCleaned}${deltaText}`;
let shouldEmit = false;
let deltaText = "";
if (!cleanedText && !hasMedia && !hasAudio) {
shouldEmit = false;
} else if (previousCleaned && !cleanedText.startsWith(previousCleaned)) {
shouldEmit = false;
} else {
deltaText = cleanedText.slice(previousCleaned.length);
shouldEmit = Boolean(deltaText || hasMedia || hasAudio);
}
ctx.state.lastStreamedAssistant = next;
ctx.state.lastStreamedAssistantCleaned = cleanedText;
emitAgentEvent({
runId: ctx.params.runId,
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
void ctx.params.onAgentEvent?.({
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) {
void ctx.params.onPartialReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
if (shouldEmit) {
emitAgentEvent({
runId: ctx.params.runId,
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: hasMedia ? mediaUrls : undefined,
},
});
void ctx.params.onAgentEvent?.({
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: hasMedia ? mediaUrls : undefined,
},
});
if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) {
void ctx.params.onPartialReply({
text: cleanedText,
mediaUrls: hasMedia ? mediaUrls : undefined,
});
}
}
}

View File

@@ -37,6 +37,7 @@ export type EmbeddedPiSubscribeState = {
deltaBuffer: string;
blockBuffer: string;
blockState: { thinking: boolean; final: boolean; inlineCode: InlineCodeState };
partialBlockState: { thinking: boolean; final: boolean; inlineCode: InlineCodeState };
lastStreamedAssistant?: string;
lastStreamedAssistantCleaned?: string;
lastStreamedReasoning?: string;

View File

@@ -46,6 +46,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
blockBuffer: "",
// Track if a streamed chunk opened a <think> block (stateful across chunks).
blockState: { thinking: false, final: false, inlineCode: createInlineCodeState() },
partialBlockState: { thinking: false, final: false, inlineCode: createInlineCodeState() },
lastStreamedAssistant: undefined,
lastStreamedAssistantCleaned: undefined,
lastStreamedReasoning: undefined,
@@ -89,6 +90,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
state.blockState.thinking = false;
state.blockState.final = false;
state.blockState.inlineCode = createInlineCodeState();
state.partialBlockState.thinking = false;
state.partialBlockState.final = false;
state.partialBlockState.inlineCode = createInlineCodeState();
state.lastStreamedAssistant = undefined;
state.lastStreamedAssistantCleaned = undefined;
state.lastBlockReplyText = undefined;