fix: stabilize flaky tests and sanitize directive-only chat tags
This commit is contained in:
@@ -1430,6 +1430,7 @@ describe("subagent announce formatting", () => {
|
||||
requesterSessionKey: "agent:main:subagent:orchestrator",
|
||||
requesterDisplayKey: "agent:main:subagent:orchestrator",
|
||||
...defaultOutcomeAnnounce,
|
||||
timeoutMs: 100,
|
||||
});
|
||||
|
||||
expect(didAnnounce).toBe(true);
|
||||
|
||||
@@ -824,6 +824,8 @@ describe("Cron issue regressions", () => {
|
||||
let now = dueAt;
|
||||
let activeRuns = 0;
|
||||
let peakActiveRuns = 0;
|
||||
const startedRunIds = new Set<string>();
|
||||
const bothRunsStarted = createDeferred<void>();
|
||||
const firstRun = createDeferred<{ status: "ok"; summary: string }>();
|
||||
const secondRun = createDeferred<{ status: "ok"; summary: string }>();
|
||||
const state = createCronServiceState({
|
||||
@@ -837,6 +839,10 @@ describe("Cron issue regressions", () => {
|
||||
runIsolatedAgentJob: vi.fn(async (params: { job: { id: string } }) => {
|
||||
activeRuns += 1;
|
||||
peakActiveRuns = Math.max(peakActiveRuns, activeRuns);
|
||||
startedRunIds.add(params.job.id);
|
||||
if (startedRunIds.size === 2) {
|
||||
bothRunsStarted.resolve();
|
||||
}
|
||||
try {
|
||||
const result =
|
||||
params.job.id === first.id ? await firstRun.promise : await secondRun.promise;
|
||||
@@ -849,7 +855,12 @@ describe("Cron issue regressions", () => {
|
||||
});
|
||||
|
||||
const timerPromise = onTimer(state);
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
await Promise.race([
|
||||
bothRunsStarted.promise,
|
||||
new Promise<never>((_, reject) =>
|
||||
setTimeout(() => reject(new Error("timed out waiting for concurrent cron runs")), 1_000),
|
||||
),
|
||||
]);
|
||||
|
||||
expect(peakActiveRuns).toBe(2);
|
||||
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { createMockSessionEntry, createTranscriptFixtureSync } from "./chat.test-helpers.js";
|
||||
import type { GatewayRequestContext } from "./types.js";
|
||||
|
||||
const mockState = vi.hoisted(() => ({
|
||||
@@ -16,15 +13,11 @@ vi.mock("../session-utils.js", async (importOriginal) => {
|
||||
const original = await importOriginal<typeof import("../session-utils.js")>();
|
||||
return {
|
||||
...original,
|
||||
loadSessionEntry: () => ({
|
||||
cfg: {},
|
||||
storePath: path.join(path.dirname(mockState.transcriptPath), "sessions.json"),
|
||||
entry: {
|
||||
loadSessionEntry: () =>
|
||||
createMockSessionEntry({
|
||||
transcriptPath: mockState.transcriptPath,
|
||||
sessionId: mockState.sessionId,
|
||||
sessionFile: mockState.transcriptPath,
|
||||
},
|
||||
canonicalKey: "main",
|
||||
}),
|
||||
}),
|
||||
};
|
||||
});
|
||||
|
||||
@@ -48,19 +41,10 @@ vi.mock("../../auto-reply/dispatch.js", () => ({
|
||||
const { chatHandlers } = await import("./chat.js");
|
||||
|
||||
function createTranscriptFixture(prefix: string) {
|
||||
const dir = fs.mkdtempSync(path.join(os.tmpdir(), prefix));
|
||||
const transcriptPath = path.join(dir, "sess.jsonl");
|
||||
fs.writeFileSync(
|
||||
transcriptPath,
|
||||
`${JSON.stringify({
|
||||
type: "session",
|
||||
version: CURRENT_SESSION_VERSION,
|
||||
id: mockState.sessionId,
|
||||
timestamp: new Date(0).toISOString(),
|
||||
cwd: "/tmp",
|
||||
})}\n`,
|
||||
"utf-8",
|
||||
);
|
||||
const { transcriptPath } = createTranscriptFixtureSync({
|
||||
prefix,
|
||||
sessionId: mockState.sessionId,
|
||||
});
|
||||
mockState.transcriptPath = transcriptPath;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,41 +1,28 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { createMockSessionEntry, createTranscriptFixtureSync } from "./chat.test-helpers.js";
|
||||
import type { GatewayRequestContext } from "./types.js";
|
||||
|
||||
// Guardrail: Ensure gateway "injected" assistant transcript messages are appended via SessionManager,
|
||||
// so they are attached to the current leaf with a `parentId` and do not sever compaction history.
|
||||
describe("gateway chat.inject transcript writes", () => {
|
||||
it("appends a Pi session entry that includes parentId", async () => {
|
||||
const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-chat-inject-"));
|
||||
const transcriptPath = path.join(dir, "sess.jsonl");
|
||||
|
||||
// Minimal Pi session header so SessionManager can open/append safely.
|
||||
fs.writeFileSync(
|
||||
transcriptPath,
|
||||
`${JSON.stringify({
|
||||
type: "session",
|
||||
version: CURRENT_SESSION_VERSION,
|
||||
id: "sess-1",
|
||||
timestamp: new Date(0).toISOString(),
|
||||
cwd: "/tmp",
|
||||
})}\n`,
|
||||
"utf-8",
|
||||
);
|
||||
const sessionId = "sess-1";
|
||||
const { transcriptPath } = createTranscriptFixtureSync({
|
||||
prefix: "openclaw-chat-inject-",
|
||||
sessionId,
|
||||
});
|
||||
|
||||
vi.doMock("../session-utils.js", async (importOriginal) => {
|
||||
const original = await importOriginal<typeof import("../session-utils.js")>();
|
||||
return {
|
||||
...original,
|
||||
loadSessionEntry: () => ({
|
||||
storePath: path.join(dir, "sessions.json"),
|
||||
entry: {
|
||||
sessionId: "sess-1",
|
||||
sessionFile: transcriptPath,
|
||||
},
|
||||
}),
|
||||
loadSessionEntry: () =>
|
||||
createMockSessionEntry({
|
||||
transcriptPath,
|
||||
sessionId,
|
||||
canonicalKey: "k1",
|
||||
}),
|
||||
};
|
||||
});
|
||||
|
||||
|
||||
42
src/gateway/server-methods/chat.test-helpers.ts
Normal file
42
src/gateway/server-methods/chat.test-helpers.ts
Normal file
@@ -0,0 +1,42 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
|
||||
|
||||
export function createTranscriptFixtureSync(params: {
|
||||
prefix: string;
|
||||
sessionId: string;
|
||||
fileName?: string;
|
||||
}) {
|
||||
const dir = fs.mkdtempSync(path.join(os.tmpdir(), params.prefix));
|
||||
const transcriptPath = path.join(dir, params.fileName ?? "sess.jsonl");
|
||||
fs.writeFileSync(
|
||||
transcriptPath,
|
||||
`${JSON.stringify({
|
||||
type: "session",
|
||||
version: CURRENT_SESSION_VERSION,
|
||||
id: params.sessionId,
|
||||
timestamp: new Date(0).toISOString(),
|
||||
cwd: "/tmp",
|
||||
})}\n`,
|
||||
"utf-8",
|
||||
);
|
||||
return { dir, transcriptPath };
|
||||
}
|
||||
|
||||
export function createMockSessionEntry(params: {
|
||||
transcriptPath: string;
|
||||
sessionId: string;
|
||||
canonicalKey?: string;
|
||||
cfg?: Record<string, unknown>;
|
||||
}) {
|
||||
return {
|
||||
cfg: params.cfg ?? {},
|
||||
storePath: path.join(path.dirname(params.transcriptPath), "sessions.json"),
|
||||
entry: {
|
||||
sessionId: params.sessionId,
|
||||
sessionFile: params.transcriptPath,
|
||||
},
|
||||
canonicalKey: params.canonicalKey ?? "main",
|
||||
};
|
||||
}
|
||||
@@ -10,7 +10,10 @@ import type { MsgContext } from "../../auto-reply/templating.js";
|
||||
import { createReplyPrefixOptions } from "../../channels/reply-prefix.js";
|
||||
import { resolveSessionFilePath } from "../../config/sessions.js";
|
||||
import { resolveSendPolicy } from "../../sessions/send-policy.js";
|
||||
import { stripInlineDirectiveTagsForDisplay } from "../../utils/directive-tags.js";
|
||||
import {
|
||||
stripInlineDirectiveTagsForDisplay,
|
||||
stripInlineDirectiveTagsFromMessageForDisplay,
|
||||
} from "../../utils/directive-tags.js";
|
||||
import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js";
|
||||
import {
|
||||
abortChatRunById,
|
||||
@@ -527,25 +530,6 @@ function nextChatSeq(context: { agentRunSeq: Map<string, number> }, runId: strin
|
||||
return next;
|
||||
}
|
||||
|
||||
function stripMessageDirectiveTags(
|
||||
message: Record<string, unknown> | undefined,
|
||||
): Record<string, unknown> | undefined {
|
||||
if (!message) {
|
||||
return message;
|
||||
}
|
||||
const content = message.content;
|
||||
if (!Array.isArray(content)) {
|
||||
return message;
|
||||
}
|
||||
const cleaned = content.map((part: Record<string, unknown>) => {
|
||||
if (part.type === "text" && typeof part.text === "string") {
|
||||
return { ...part, text: stripInlineDirectiveTagsForDisplay(part.text).text };
|
||||
}
|
||||
return part;
|
||||
});
|
||||
return { ...message, content: cleaned };
|
||||
}
|
||||
|
||||
function broadcastChatFinal(params: {
|
||||
context: Pick<GatewayRequestContext, "broadcast" | "nodeSendToSession" | "agentRunSeq">;
|
||||
runId: string;
|
||||
@@ -558,7 +542,7 @@ function broadcastChatFinal(params: {
|
||||
sessionKey: params.sessionKey,
|
||||
seq,
|
||||
state: "final" as const,
|
||||
message: stripMessageDirectiveTags(params.message),
|
||||
message: stripInlineDirectiveTagsFromMessageForDisplay(params.message),
|
||||
};
|
||||
params.context.broadcast("chat", payload);
|
||||
params.context.nodeSendToSession(params.sessionKey, "chat", payload);
|
||||
@@ -1089,7 +1073,7 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
sessionKey: rawSessionKey,
|
||||
seq: 0,
|
||||
state: "final" as const,
|
||||
message: stripMessageDirectiveTags(appended.message),
|
||||
message: stripInlineDirectiveTagsFromMessageForDisplay(appended.message),
|
||||
};
|
||||
context.broadcast("chat", chatPayload);
|
||||
context.nodeSendToSession(rawSessionKey, "chat", chatPayload);
|
||||
|
||||
@@ -51,11 +51,11 @@ describe("runCommandWithTimeout", () => {
|
||||
[
|
||||
process.execPath,
|
||||
"-e",
|
||||
'process.stdout.write("."); setTimeout(() => process.stdout.write("."), 30); setTimeout(() => process.exit(0), 60);',
|
||||
'process.stdout.write(".\\n"); const interval = setInterval(() => process.stdout.write(".\\n"), 1800); setTimeout(() => { clearInterval(interval); process.exit(0); }, 9000);',
|
||||
],
|
||||
{
|
||||
timeoutMs: 1_000,
|
||||
noOutputTimeoutMs: 500,
|
||||
timeoutMs: 15_000,
|
||||
noOutputTimeoutMs: 6_000,
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import { describe, expect, test } from "vitest";
|
||||
import { stripInlineDirectiveTagsForDisplay } from "./directive-tags.js";
|
||||
import {
|
||||
stripInlineDirectiveTagsForDisplay,
|
||||
stripInlineDirectiveTagsFromMessageForDisplay,
|
||||
} from "./directive-tags.js";
|
||||
|
||||
describe("stripInlineDirectiveTagsForDisplay", () => {
|
||||
test("removes reply and audio directives", () => {
|
||||
@@ -23,3 +26,34 @@ describe("stripInlineDirectiveTagsForDisplay", () => {
|
||||
expect(result.text).toBe(input);
|
||||
});
|
||||
});
|
||||
|
||||
describe("stripInlineDirectiveTagsFromMessageForDisplay", () => {
|
||||
test("strips inline directives from text content blocks", () => {
|
||||
const input = {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "hello [[reply_to_current]] world [[audio_as_voice]]" }],
|
||||
};
|
||||
const result = stripInlineDirectiveTagsFromMessageForDisplay(input);
|
||||
expect(result).toBeDefined();
|
||||
expect(result?.content).toEqual([{ type: "text", text: "hello world " }]);
|
||||
});
|
||||
|
||||
test("preserves empty-string text when directives are entire content", () => {
|
||||
const input = {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "[[reply_to_current]]" }],
|
||||
};
|
||||
const result = stripInlineDirectiveTagsFromMessageForDisplay(input);
|
||||
expect(result).toBeDefined();
|
||||
expect(result?.content).toEqual([{ type: "text", text: "" }]);
|
||||
});
|
||||
|
||||
test("returns original message when content is not an array", () => {
|
||||
const input = {
|
||||
role: "assistant",
|
||||
content: "plain text",
|
||||
};
|
||||
const result = stripInlineDirectiveTagsFromMessageForDisplay(input);
|
||||
expect(result).toEqual(input);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -29,6 +29,17 @@ type StripInlineDirectiveTagsResult = {
|
||||
changed: boolean;
|
||||
};
|
||||
|
||||
type MessageTextPart = {
|
||||
type: "text";
|
||||
text: string;
|
||||
} & Record<string, unknown>;
|
||||
|
||||
type MessagePart = Record<string, unknown> | null | undefined;
|
||||
|
||||
export type DisplayMessageWithContent = {
|
||||
content?: unknown;
|
||||
} & Record<string, unknown>;
|
||||
|
||||
export function stripInlineDirectiveTagsForDisplay(text: string): StripInlineDirectiveTagsResult {
|
||||
if (!text) {
|
||||
return { text, changed: false };
|
||||
@@ -41,6 +52,36 @@ export function stripInlineDirectiveTagsForDisplay(text: string): StripInlineDir
|
||||
};
|
||||
}
|
||||
|
||||
function isMessageTextPart(part: MessagePart): part is MessageTextPart {
|
||||
return Boolean(part) && part?.type === "text" && typeof part.text === "string";
|
||||
}
|
||||
|
||||
/**
|
||||
* Strips inline directive tags from message text blocks while preserving message shape.
|
||||
* Empty post-strip text stays empty-string to preserve caller semantics.
|
||||
*/
|
||||
export function stripInlineDirectiveTagsFromMessageForDisplay(
|
||||
message: DisplayMessageWithContent | undefined,
|
||||
): DisplayMessageWithContent | undefined {
|
||||
if (!message) {
|
||||
return message;
|
||||
}
|
||||
if (!Array.isArray(message.content)) {
|
||||
return message;
|
||||
}
|
||||
const cleaned = message.content.map((part) => {
|
||||
if (!part || typeof part !== "object") {
|
||||
return part;
|
||||
}
|
||||
const record = part as MessagePart;
|
||||
if (!isMessageTextPart(record)) {
|
||||
return part;
|
||||
}
|
||||
return { ...record, text: stripInlineDirectiveTagsForDisplay(record.text).text };
|
||||
});
|
||||
return { ...message, content: cleaned };
|
||||
}
|
||||
|
||||
export function parseInlineDirectives(
|
||||
text?: string,
|
||||
options: InlineDirectiveParseOptions = {},
|
||||
|
||||
Reference in New Issue
Block a user