diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index b9318e4ea..1ebee8eef 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -108,13 +108,19 @@ export function handleMessageUpdate( }) .trim(); if (next && next !== ctx.state.lastStreamedAssistant) { + const previousText = ctx.state.lastStreamedAssistant ?? ""; ctx.state.lastStreamedAssistant = next; const { text: cleanedText, mediaUrls } = parseReplyDirectives(next); + const { text: previousCleanedText } = parseReplyDirectives(previousText); + const deltaText = cleanedText.startsWith(previousCleanedText) + ? cleanedText.slice(previousCleanedText.length) + : cleanedText; emitAgentEvent({ runId: ctx.params.runId, stream: "assistant", data: { text: cleanedText, + delta: deltaText, mediaUrls: mediaUrls?.length ? mediaUrls : undefined, }, }); @@ -122,6 +128,7 @@ export function handleMessageUpdate( stream: "assistant", data: { text: cleanedText, + delta: deltaText, mediaUrls: mediaUrls?.length ? mediaUrls : undefined, }, }); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts index 3af91731f..486cbfa65 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts @@ -146,4 +146,42 @@ describe("subscribeEmbeddedPiSession", () => { expect(combined).toBe("Final answer"); }, ); + + it("emits delta chunks in agent events for streaming assistant text", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onAgentEvent = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run", + onAgentEvent, + }); + + handler?.({ type: "message_start", message: { role: "assistant" } }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_delta", delta: "Hello" }, + }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_delta", delta: " world" }, + }); + + const payloads = onAgentEvent.mock.calls + .map((call) => call[0]?.data as Record | undefined) + .filter((value): value is Record => Boolean(value)); + expect(payloads[0]?.text).toBe("Hello"); + expect(payloads[0]?.delta).toBe("Hello"); + expect(payloads[1]?.text).toBe("Hello world"); + expect(payloads[1]?.delta).toBe(" world"); + }); });