fix(feishu): preserve block streaming text when final payload is missing (#30663)
* fix(feishu): preserve block streaming text when final payload is missing When Feishu card streaming receives block payloads without matching final/partial callbacks, keep block text in stream state so onIdle close still publishes the reply instead of an empty message. Add a regression test for block-only streaming. Closes #30628 * Feishu: preserve streaming block fallback when final text is missing --------- Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -226,6 +226,24 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
|
||||
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("closes streaming with block text when final reply is missing", async () => {
|
||||
createFeishuReplyDispatcher({
|
||||
cfg: {} as never,
|
||||
agentId: "agent",
|
||||
runtime: { log: vi.fn(), error: vi.fn() } as never,
|
||||
chatId: "oc_chat",
|
||||
});
|
||||
|
||||
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
|
||||
await options.deliver({ text: "```md\npartial answer\n```" }, { kind: "block" });
|
||||
await options.onIdle?.();
|
||||
|
||||
expect(streamingInstances).toHaveLength(1);
|
||||
expect(streamingInstances[0].start).toHaveBeenCalledTimes(1);
|
||||
expect(streamingInstances[0].close).toHaveBeenCalledTimes(1);
|
||||
expect(streamingInstances[0].close).toHaveBeenCalledWith("```md\npartial answer\n```");
|
||||
});
|
||||
|
||||
it("sends media-only payloads as attachments", async () => {
|
||||
createFeishuReplyDispatcher({
|
||||
cfg: {} as never,
|
||||
|
||||
@@ -146,6 +146,48 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
|
||||
let partialUpdateQueue: Promise<void> = Promise.resolve();
|
||||
let streamingStartPromise: Promise<void> | null = null;
|
||||
|
||||
const mergeStreamingText = (nextText: string) => {
|
||||
if (!streamText) {
|
||||
streamText = nextText;
|
||||
return;
|
||||
}
|
||||
if (nextText.startsWith(streamText)) {
|
||||
// Handle cumulative partial payloads where nextText already includes prior text.
|
||||
streamText = nextText;
|
||||
return;
|
||||
}
|
||||
if (streamText.endsWith(nextText)) {
|
||||
return;
|
||||
}
|
||||
streamText += nextText;
|
||||
};
|
||||
|
||||
const queueStreamingUpdate = (
|
||||
nextText: string,
|
||||
options?: {
|
||||
dedupeWithLastPartial?: boolean;
|
||||
},
|
||||
) => {
|
||||
if (!nextText) {
|
||||
return;
|
||||
}
|
||||
if (options?.dedupeWithLastPartial && nextText === lastPartial) {
|
||||
return;
|
||||
}
|
||||
if (options?.dedupeWithLastPartial) {
|
||||
lastPartial = nextText;
|
||||
}
|
||||
mergeStreamingText(nextText);
|
||||
partialUpdateQueue = partialUpdateQueue.then(async () => {
|
||||
if (streamingStartPromise) {
|
||||
await streamingStartPromise;
|
||||
}
|
||||
if (streaming?.isActive()) {
|
||||
await streaming.update(streamText);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
const startStreaming = () => {
|
||||
if (!streamingEnabled || streamingStartPromise || streaming) {
|
||||
return;
|
||||
@@ -205,12 +247,6 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
|
||||
void typingCallbacks.onReplyStart?.();
|
||||
},
|
||||
deliver: async (payload: ReplyPayload, info) => {
|
||||
// FIX: Filter out internal 'block' reasoning chunks immediately to prevent
|
||||
// data leak and race conditions with streaming state initialization.
|
||||
if (info?.kind === "block") {
|
||||
return;
|
||||
}
|
||||
|
||||
const text = payload.text ?? "";
|
||||
const mediaList =
|
||||
payload.mediaUrls && payload.mediaUrls.length > 0
|
||||
@@ -228,6 +264,18 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
|
||||
if (hasText) {
|
||||
const useCard = renderMode === "card" || (renderMode === "auto" && shouldUseCard(text));
|
||||
|
||||
if (info?.kind === "block") {
|
||||
// Drop internal block chunks unless we can safely consume them as
|
||||
// streaming-card fallback content.
|
||||
if (!(streamingEnabled && useCard)) {
|
||||
return;
|
||||
}
|
||||
startStreaming();
|
||||
if (streamingStartPromise) {
|
||||
await streamingStartPromise;
|
||||
}
|
||||
}
|
||||
|
||||
if (info?.kind === "final" && streamingEnabled && useCard) {
|
||||
startStreaming();
|
||||
if (streamingStartPromise) {
|
||||
@@ -236,6 +284,11 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
|
||||
}
|
||||
|
||||
if (streaming?.isActive()) {
|
||||
if (info?.kind === "block") {
|
||||
// Some runtimes emit block payloads without onPartial/final callbacks.
|
||||
// Mirror block text into streamText so onIdle close still sends content.
|
||||
queueStreamingUpdate(text);
|
||||
}
|
||||
if (info?.kind === "final") {
|
||||
streamText = text;
|
||||
await closeStreaming();
|
||||
@@ -331,19 +384,10 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
|
||||
onModelSelected: prefixContext.onModelSelected,
|
||||
onPartialReply: streamingEnabled
|
||||
? (payload: ReplyPayload) => {
|
||||
if (!payload.text || payload.text === lastPartial) {
|
||||
if (!payload.text) {
|
||||
return;
|
||||
}
|
||||
lastPartial = payload.text;
|
||||
streamText = payload.text;
|
||||
partialUpdateQueue = partialUpdateQueue.then(async () => {
|
||||
if (streamingStartPromise) {
|
||||
await streamingStartPromise;
|
||||
}
|
||||
if (streaming?.isActive()) {
|
||||
await streaming.update(streamText);
|
||||
}
|
||||
});
|
||||
queueStreamingUpdate(payload.text, { dedupeWithLastPartial: true });
|
||||
}
|
||||
: undefined,
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user