refactor(test): dedupe pi embedded subscribe e2e harness

This commit is contained in:
Peter Steinberger
2026-02-15 21:18:10 +00:00
parent 059573a48d
commit a02e5759cc
16 changed files with 49 additions and 186 deletions

View File

@@ -0,0 +1,18 @@
import type { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
type PiSession = Parameters<subscribeEmbeddedPiSession>[0]["session"];
export function createStubSessionHarness(): {
session: PiSession;
emit: (evt: unknown) => void;
} {
let handler: ((evt: unknown) => void) | undefined;
const session = {
subscribe: (fn: (evt: unknown) => void) => {
handler = fn;
return () => {};
},
} as unknown as PiSession;
return { session, emit: (evt: unknown) => handler?.(evt) };
}

View File

@@ -8,13 +8,6 @@ type StubSession = {
type SessionEventHandler = (evt: unknown) => void;
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("calls onBlockReplyFlush before tool_execution_start to preserve message boundaries", () => {
let handler: SessionEventHandler | undefined;
const session: StubSession = {

View File

@@ -6,13 +6,6 @@ type StubSession = {
};
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
function setupTextEndSubscription() {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {

View File

@@ -8,13 +8,6 @@ type StubSession = {
type SessionEventHandler = (evt: unknown) => void;
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("does not call onBlockReplyFlush when callback is not provided", () => {
let handler: SessionEventHandler | undefined;
const session: StubSession = {

View File

@@ -6,13 +6,6 @@ type StubSession = {
};
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("does not duplicate when text_end repeats full content", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {

View File

@@ -1,40 +1,22 @@
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
type StubSession = {
subscribe: (fn: (evt: unknown) => void) => () => void;
};
type SessionEventHandler = (evt: unknown) => void;
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("does not emit duplicate block replies when text_end repeats", () => {
let handler: SessionEventHandler | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
session,
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
});
handler?.({
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
@@ -43,7 +25,7 @@ describe("subscribeEmbeddedPiSession", () => {
},
});
handler?.({
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
@@ -51,7 +33,7 @@ describe("subscribeEmbeddedPiSession", () => {
},
});
handler?.({
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
@@ -63,16 +45,10 @@ describe("subscribeEmbeddedPiSession", () => {
expect(subscription.assistantTexts).toEqual(["Hello block"]);
});
it("does not duplicate assistantTexts when message_end repeats", () => {
let handler: SessionEventHandler | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const { session, emit } = createStubSessionHarness();
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
session,
runId: "run",
});
@@ -81,22 +57,16 @@ describe("subscribeEmbeddedPiSession", () => {
content: [{ type: "text", text: "Hello world" }],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessage });
handler?.({ type: "message_end", message: assistantMessage });
emit({ type: "message_end", message: assistantMessage });
emit({ type: "message_end", message: assistantMessage });
expect(subscription.assistantTexts).toEqual(["Hello world"]);
});
it("does not duplicate assistantTexts when message_end repeats with trailing whitespace changes", () => {
let handler: SessionEventHandler | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const { session, emit } = createStubSessionHarness();
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
session,
runId: "run",
});
@@ -110,22 +80,16 @@ describe("subscribeEmbeddedPiSession", () => {
content: [{ type: "text", text: "Hello world" }],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessageWithNewline });
handler?.({ type: "message_end", message: assistantMessageTrimmed });
emit({ type: "message_end", message: assistantMessageWithNewline });
emit({ type: "message_end", message: assistantMessageTrimmed });
expect(subscription.assistantTexts).toEqual(["Hello world"]);
});
it("does not duplicate assistantTexts when message_end repeats with reasoning blocks", () => {
let handler: SessionEventHandler | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const { session, emit } = createStubSessionHarness();
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
session,
runId: "run",
reasoningMode: "on",
});
@@ -138,37 +102,31 @@ describe("subscribeEmbeddedPiSession", () => {
],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessage });
handler?.({ type: "message_end", message: assistantMessage });
emit({ type: "message_end", message: assistantMessage });
emit({ type: "message_end", message: assistantMessage });
expect(subscription.assistantTexts).toEqual(["Hello world"]);
});
it("populates assistantTexts for non-streaming models with chunking enabled", () => {
// Non-streaming models (e.g. zai/glm-4.7): no text_delta events; message_end
// must still populate assistantTexts so providers can deliver a final reply.
let handler: SessionEventHandler | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const { session, emit } = createStubSessionHarness();
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
session,
runId: "run",
blockReplyChunking: { minChars: 50, maxChars: 200 }, // Chunking enabled
});
// Simulate non-streaming model: only message_start and message_end, no text_delta
handler?.({ type: "message_start", message: { role: "assistant" } });
emit({ type: "message_start", message: { role: "assistant" } });
const assistantMessage = {
role: "assistant",
content: [{ type: "text", text: "Response from non-streaming model" }],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessage });
emit({ type: "message_end", message: assistantMessage });
expect(subscription.assistantTexts).toEqual(["Response from non-streaming model"]);
});

View File

@@ -7,13 +7,6 @@ type StubSession = {
};
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("emits block replies on text_end and does not duplicate on message_end", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {

View File

@@ -7,13 +7,6 @@ type StubSession = {
};
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("filters to <final> and suppresses output without a start tag", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {

View File

@@ -6,13 +6,6 @@ type StubSession = {
};
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("includes canvas action metadata in tool summaries", async () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {

View File

@@ -7,13 +7,6 @@ type StubSession = {
};
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("keeps assistantTexts to the final answer when block replies are disabled", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {

View File

@@ -7,13 +7,6 @@ type StubSession = {
};
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("keeps indented fenced blocks intact", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {

View File

@@ -7,13 +7,6 @@ type StubSession = {
};
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("reopens fenced blocks when splitting inside them", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {

View File

@@ -7,13 +7,6 @@ type StubSession = {
};
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("splits long single-line fenced blocks with reopen/close", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {

View File

@@ -1,32 +1,16 @@
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
type StubSession = {
subscribe: (fn: (evt: unknown) => void) => () => void;
};
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("streams soft chunks with paragraph preference", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
session,
runId: "run",
onBlockReply,
blockReplyBreak: "message_end",
@@ -39,7 +23,7 @@ describe("subscribeEmbeddedPiSession", () => {
const text = "First block line\n\nSecond block line";
handler?.({
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
@@ -53,7 +37,7 @@ describe("subscribeEmbeddedPiSession", () => {
content: [{ type: "text", text }],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessage });
emit({ type: "message_end", message: assistantMessage });
expect(onBlockReply).toHaveBeenCalledTimes(2);
expect(onBlockReply.mock.calls[0][0].text).toBe("First block line");
@@ -61,18 +45,12 @@ describe("subscribeEmbeddedPiSession", () => {
expect(subscription.assistantTexts).toEqual(["First block line", "Second block line"]);
});
it("avoids splitting inside fenced code blocks", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
session,
runId: "run",
onBlockReply,
blockReplyBreak: "message_end",
@@ -85,7 +63,7 @@ describe("subscribeEmbeddedPiSession", () => {
const text = "Intro\n\n```bash\nline1\nline2\n```\n\nOutro";
handler?.({
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
@@ -99,7 +77,7 @@ describe("subscribeEmbeddedPiSession", () => {
content: [{ type: "text", text }],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessage });
emit({ type: "message_end", message: assistantMessage });
expect(onBlockReply).toHaveBeenCalledTimes(3);
expect(onBlockReply.mock.calls[0][0].text).toBe("Intro");

View File

@@ -7,13 +7,6 @@ type StubSession = {
};
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("suppresses message_end block replies when the message tool already sent", async () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {

View File

@@ -7,13 +7,6 @@ type StubSession = {
};
describe("subscribeEmbeddedPiSession", () => {
const _THINKING_TAG_CASES = [
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
{ tag: "thought", open: "<thought>", close: "</thought>" },
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
] as const;
it("waits for multiple compaction retries before resolving", async () => {
const listeners: SessionEventHandler[] = [];
const session = {