perf(test): consolidate agent runner suites

This commit is contained in:
Peter Steinberger
2026-02-14 20:22:16 +00:00
parent 0b20ee2722
commit 42ab5dd2d1
12 changed files with 1131 additions and 1226 deletions

View File

@@ -1,135 +0,0 @@
import fs from "node:fs/promises";
import { tmpdir } from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import * as sessions from "../../config/sessions.js";
import {
createMinimalRun,
getRunEmbeddedPiAgentMock,
installRunReplyAgentTypingHeartbeatTestHooks,
} from "./agent-runner.heartbeat-typing.test-harness.js";
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
describe("runReplyAgent typing (heartbeat)", () => {
installRunReplyAgentTypingHeartbeatTestHooks();
it("resets corrupted Gemini sessions and deletes transcripts", async () => {
const prevStateDir = process.env.OPENCLAW_STATE_DIR;
const stateDir = await fs.mkdtemp(path.join(tmpdir(), "openclaw-session-reset-"));
process.env.OPENCLAW_STATE_DIR = stateDir;
try {
const sessionId = "session-corrupt";
const storePath = path.join(stateDir, "sessions", "sessions.json");
const sessionEntry = { sessionId, updatedAt: Date.now() };
const sessionStore = { main: sessionEntry };
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(transcriptPath, "bad", "utf-8");
runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
throw new Error(
"function call turn comes immediately after a user turn or after a function response turn",
);
});
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(res).toMatchObject({
text: expect.stringContaining("Session history was corrupted"),
});
expect(sessionStore.main).toBeUndefined();
await expect(fs.access(transcriptPath)).rejects.toThrow();
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(persisted.main).toBeUndefined();
} finally {
if (prevStateDir) {
process.env.OPENCLAW_STATE_DIR = prevStateDir;
} else {
delete process.env.OPENCLAW_STATE_DIR;
}
}
});
it("keeps sessions intact on other errors", async () => {
const prevStateDir = process.env.OPENCLAW_STATE_DIR;
const stateDir = await fs.mkdtemp(path.join(tmpdir(), "openclaw-session-noreset-"));
process.env.OPENCLAW_STATE_DIR = stateDir;
try {
const sessionId = "session-ok";
const storePath = path.join(stateDir, "sessions", "sessions.json");
const sessionEntry = { sessionId, updatedAt: Date.now() };
const sessionStore = { main: sessionEntry };
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(transcriptPath, "ok", "utf-8");
runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
throw new Error("INVALID_ARGUMENT: some other failure");
});
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(res).toMatchObject({
text: expect.stringContaining("Agent failed before reply"),
});
expect(sessionStore.main).toBeDefined();
await expect(fs.access(transcriptPath)).resolves.toBeUndefined();
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(persisted.main).toBeDefined();
} finally {
if (prevStateDir) {
process.env.OPENCLAW_STATE_DIR = prevStateDir;
} else {
delete process.env.OPENCLAW_STATE_DIR;
}
}
});
it("returns friendly message for role ordering errors thrown as exceptions", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
throw new Error("400 Incorrect role information");
});
const { run } = createMinimalRun({});
const res = await run();
expect(res).toMatchObject({
text: expect.stringContaining("Message ordering conflict"),
});
expect(res).toMatchObject({
text: expect.not.stringContaining("400"),
});
});
it("returns friendly message for 'roles must alternate' errors thrown as exceptions", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
throw new Error('messages: roles must alternate between "user" and "assistant"');
});
const { run } = createMinimalRun({});
const res = await run();
expect(res).toMatchObject({
text: expect.stringContaining("Message ordering conflict"),
});
});
});

View File

@@ -1,172 +0,0 @@
import fs from "node:fs/promises";
import { tmpdir } from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import * as sessions from "../../config/sessions.js";
import {
createMinimalRun,
getRunEmbeddedPiAgentMock,
installRunReplyAgentTypingHeartbeatTestHooks,
} from "./agent-runner.heartbeat-typing.test-harness.js";
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
describe("runReplyAgent typing (heartbeat)", () => {
installRunReplyAgentTypingHeartbeatTestHooks();
it("retries after compaction failure by resetting the session", async () => {
const prevStateDir = process.env.OPENCLAW_STATE_DIR;
const stateDir = await fs.mkdtemp(path.join(tmpdir(), "openclaw-session-compaction-reset-"));
process.env.OPENCLAW_STATE_DIR = stateDir;
try {
const sessionId = "session";
const storePath = path.join(stateDir, "sessions", "sessions.json");
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath };
const sessionStore = { main: sessionEntry };
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(transcriptPath, "ok", "utf-8");
runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
throw new Error(
'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}',
);
});
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
const payload = Array.isArray(res) ? res[0] : res;
expect(payload).toMatchObject({
text: expect.stringContaining("Context limit exceeded during compaction"),
});
expect(payload.text?.toLowerCase()).toContain("reset");
expect(sessionStore.main.sessionId).not.toBe(sessionId);
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId);
} finally {
if (prevStateDir) {
process.env.OPENCLAW_STATE_DIR = prevStateDir;
} else {
delete process.env.OPENCLAW_STATE_DIR;
}
}
});
it("retries after context overflow payload by resetting the session", async () => {
const prevStateDir = process.env.OPENCLAW_STATE_DIR;
const stateDir = await fs.mkdtemp(path.join(tmpdir(), "openclaw-session-overflow-reset-"));
process.env.OPENCLAW_STATE_DIR = stateDir;
try {
const sessionId = "session";
const storePath = path.join(stateDir, "sessions", "sessions.json");
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath };
const sessionStore = { main: sessionEntry };
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(transcriptPath, "ok", "utf-8");
runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({
payloads: [{ text: "Context overflow: prompt too large", isError: true }],
meta: {
durationMs: 1,
error: {
kind: "context_overflow",
message: 'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}',
},
},
}));
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
const payload = Array.isArray(res) ? res[0] : res;
expect(payload).toMatchObject({
text: expect.stringContaining("Context limit exceeded"),
});
expect(payload.text?.toLowerCase()).toContain("reset");
expect(sessionStore.main.sessionId).not.toBe(sessionId);
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId);
} finally {
if (prevStateDir) {
process.env.OPENCLAW_STATE_DIR = prevStateDir;
} else {
delete process.env.OPENCLAW_STATE_DIR;
}
}
});
it("resets the session after role ordering payloads", async () => {
const prevStateDir = process.env.OPENCLAW_STATE_DIR;
const stateDir = await fs.mkdtemp(path.join(tmpdir(), "openclaw-session-role-ordering-"));
process.env.OPENCLAW_STATE_DIR = stateDir;
try {
const sessionId = "session";
const storePath = path.join(stateDir, "sessions", "sessions.json");
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath };
const sessionStore = { main: sessionEntry };
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(transcriptPath, "ok", "utf-8");
runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({
payloads: [{ text: "Message ordering conflict - please try again.", isError: true }],
meta: {
durationMs: 1,
error: {
kind: "role_ordering",
message: 'messages: roles must alternate between "user" and "assistant"',
},
},
}));
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
const payload = Array.isArray(res) ? res[0] : res;
expect(payload).toMatchObject({
text: expect.stringContaining("Message ordering conflict"),
});
expect(payload.text?.toLowerCase()).toContain("reset");
expect(sessionStore.main.sessionId).not.toBe(sessionId);
await expect(fs.access(transcriptPath)).rejects.toBeDefined();
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId);
} finally {
if (prevStateDir) {
process.env.OPENCLAW_STATE_DIR = prevStateDir;
} else {
delete process.env.OPENCLAW_STATE_DIR;
}
}
});
});

View File

@@ -1,107 +0,0 @@
import fs from "node:fs/promises";
import { tmpdir } from "node:os";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import {
createMinimalRun,
getRunEmbeddedPiAgentMock,
installRunReplyAgentTypingHeartbeatTestHooks,
} from "./agent-runner.heartbeat-typing.test-harness.js";
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
describe("runReplyAgent typing (heartbeat)", () => {
installRunReplyAgentTypingHeartbeatTestHooks();
it("signals typing on block replies", async () => {
const onBlockReply = vi.fn();
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => {
await params.onBlockReply?.({ text: "chunk", mediaUrls: [] });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
typingMode: "message",
blockStreamingEnabled: true,
opts: { onBlockReply },
});
await run();
expect(typing.startTypingOnText).toHaveBeenCalledWith("chunk");
expect(onBlockReply).toHaveBeenCalled();
const [blockPayload, blockOpts] = onBlockReply.mock.calls[0] ?? [];
expect(blockPayload).toMatchObject({ text: "chunk", audioAsVoice: false });
expect(blockOpts).toMatchObject({
abortSignal: expect.any(AbortSignal),
timeoutMs: expect.any(Number),
});
});
it("signals typing on tool results", async () => {
const onToolResult = vi.fn();
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => {
await params.onToolResult?.({ text: "tooling", mediaUrls: [] });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
typingMode: "message",
opts: { onToolResult },
});
await run();
expect(typing.startTypingOnText).toHaveBeenCalledWith("tooling");
expect(onToolResult).toHaveBeenCalledWith({
text: "tooling",
mediaUrls: [],
});
});
it("skips typing for silent tool results", async () => {
const onToolResult = vi.fn();
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => {
await params.onToolResult?.({ text: "NO_REPLY", mediaUrls: [] });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
typingMode: "message",
opts: { onToolResult },
});
await run();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
expect(onToolResult).not.toHaveBeenCalled();
});
it("announces auto-compaction in verbose mode and tracks count", async () => {
const storePath = path.join(
await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-")),
"sessions.json",
);
const sessionEntry = { sessionId: "session", updatedAt: Date.now() };
const sessionStore = { main: sessionEntry };
runEmbeddedPiAgentMock.mockImplementationOnce(
async (params: {
onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void;
}) => {
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false },
});
return { payloads: [{ text: "final" }], meta: {} };
},
);
const { run } = createMinimalRun({
resolvedVerboseLevel: "on",
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(Array.isArray(res)).toBe(true);
const payloads = res as { text?: string }[];
expect(payloads[0]?.text).toContain("Auto-compaction complete");
expect(payloads[0]?.text).toContain("count 1");
expect(sessionStore.main.compactionCount).toBe(1);
});
});

View File

@@ -1,127 +0,0 @@
import { describe, expect, it, vi } from "vitest";
import {
createMinimalRun,
getRunEmbeddedPiAgentMock,
installRunReplyAgentTypingHeartbeatTestHooks,
} from "./agent-runner.heartbeat-typing.test-harness.js";
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
describe("runReplyAgent typing (heartbeat)", () => {
installRunReplyAgentTypingHeartbeatTestHooks();
it("signals typing for normal runs", async () => {
const onPartialReply = vi.fn();
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => {
await params.onPartialReply?.({ text: "hi" });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
opts: { isHeartbeat: false, onPartialReply },
});
await run();
expect(onPartialReply).toHaveBeenCalled();
expect(typing.startTypingOnText).toHaveBeenCalledWith("hi");
expect(typing.startTypingLoop).toHaveBeenCalled();
});
it("signals typing even without consumer partial handler", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => {
await params.onPartialReply?.({ text: "hi" });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
typingMode: "message",
});
await run();
expect(typing.startTypingOnText).toHaveBeenCalledWith("hi");
expect(typing.startTypingLoop).not.toHaveBeenCalled();
});
it("never signals typing for heartbeat runs", async () => {
const onPartialReply = vi.fn();
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => {
await params.onPartialReply?.({ text: "hi" });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
opts: { isHeartbeat: true, onPartialReply },
});
await run();
expect(onPartialReply).toHaveBeenCalled();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
expect(typing.startTypingLoop).not.toHaveBeenCalled();
});
it("suppresses partial streaming for NO_REPLY", async () => {
const onPartialReply = vi.fn();
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => {
await params.onPartialReply?.({ text: "NO_REPLY" });
return { payloads: [{ text: "NO_REPLY" }], meta: {} };
});
const { run, typing } = createMinimalRun({
opts: { isHeartbeat: false, onPartialReply },
typingMode: "message",
});
await run();
expect(onPartialReply).not.toHaveBeenCalled();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
expect(typing.startTypingLoop).not.toHaveBeenCalled();
});
it("does not start typing on assistant message start without prior text in message mode", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => {
await params.onAssistantMessageStart?.();
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
typingMode: "message",
});
await run();
// Typing only starts when there's actual renderable text, not on message start alone
expect(typing.startTypingLoop).not.toHaveBeenCalled();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
});
it("starts typing from reasoning stream in thinking mode", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(
async (params: {
onPartialReply?: (payload: { text?: string }) => Promise<void> | void;
onReasoningStream?: (payload: { text?: string }) => Promise<void> | void;
}) => {
await params.onReasoningStream?.({ text: "Reasoning:\n_step_" });
await params.onPartialReply?.({ text: "hi" });
return { payloads: [{ text: "final" }], meta: {} };
},
);
const { run, typing } = createMinimalRun({
typingMode: "thinking",
});
await run();
expect(typing.startTypingLoop).toHaveBeenCalled();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
});
it("suppresses typing in never mode", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(
async (params: { onPartialReply?: (payload: { text?: string }) => void }) => {
params.onPartialReply?.({ text: "hi" });
return { payloads: [{ text: "final" }], meta: {} };
},
);
const { run, typing } = createMinimalRun({
typingMode: "never",
});
await run();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
expect(typing.startTypingLoop).not.toHaveBeenCalled();
});
});

View File

@@ -1,78 +0,0 @@
import fs from "node:fs/promises";
import { tmpdir } from "node:os";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import * as sessions from "../../config/sessions.js";
import {
createMinimalRun,
getRunEmbeddedPiAgentMock,
installRunReplyAgentTypingHeartbeatTestHooks,
} from "./agent-runner.heartbeat-typing.test-harness.js";
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
describe("runReplyAgent typing (heartbeat)", () => {
installRunReplyAgentTypingHeartbeatTestHooks();
it("still replies even if session reset fails to persist", async () => {
const prevStateDir = process.env.OPENCLAW_STATE_DIR;
const stateDir = await fs.mkdtemp(path.join(tmpdir(), "openclaw-session-reset-fail-"));
process.env.OPENCLAW_STATE_DIR = stateDir;
const saveSpy = vi.spyOn(sessions, "saveSessionStore").mockRejectedValueOnce(new Error("boom"));
try {
const sessionId = "session-corrupt";
const storePath = path.join(stateDir, "sessions", "sessions.json");
const sessionEntry = { sessionId, updatedAt: Date.now() };
const sessionStore = { main: sessionEntry };
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(transcriptPath, "bad", "utf-8");
runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
throw new Error(
"function call turn comes immediately after a user turn or after a function response turn",
);
});
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(res).toMatchObject({
text: expect.stringContaining("Session history was corrupted"),
});
expect(sessionStore.main).toBeUndefined();
await expect(fs.access(transcriptPath)).rejects.toThrow();
} finally {
saveSpy.mockRestore();
if (prevStateDir) {
process.env.OPENCLAW_STATE_DIR = prevStateDir;
} else {
delete process.env.OPENCLAW_STATE_DIR;
}
}
});
it("rewrites Bun socket errors into friendly text", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({
payloads: [
{
text: "TypeError: The socket connection was closed unexpectedly. For more information, pass `verbose: true` in the second argument to fetch()",
isError: true,
},
],
meta: {},
}));
const { run } = createMinimalRun();
const res = await run();
const payloads = Array.isArray(res) ? res : res ? [res] : [];
expect(payloads.length).toBe(1);
expect(payloads[0]?.text).toContain("LLM connection failed");
expect(payloads[0]?.text).toContain("socket connection was closed unexpectedly");
expect(payloads[0]?.text).toContain("```");
});
});

View File

@@ -0,0 +1,569 @@
import fs from "node:fs/promises";
import { tmpdir } from "node:os";
import path from "node:path";
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import * as sessions from "../../config/sessions.js";
import {
createMinimalRun,
getRunEmbeddedPiAgentMock,
installRunReplyAgentTypingHeartbeatTestHooks,
} from "./agent-runner.heartbeat-typing.test-harness.js";
type AgentRunParams = {
onPartialReply?: (payload: { text?: string }) => Promise<void> | void;
onAssistantMessageStart?: () => Promise<void> | void;
onReasoningStream?: (payload: { text?: string }) => Promise<void> | void;
onBlockReply?: (payload: { text?: string; mediaUrls?: string[] }) => Promise<void> | void;
onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => Promise<void> | void;
onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void;
};
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
let fixtureRoot = "";
let caseId = 0;
type StateEnvSnapshot = {
OPENCLAW_STATE_DIR: string | undefined;
};
function snapshotStateEnv(): StateEnvSnapshot {
return { OPENCLAW_STATE_DIR: process.env.OPENCLAW_STATE_DIR };
}
function restoreStateEnv(snapshot: StateEnvSnapshot) {
if (snapshot.OPENCLAW_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = snapshot.OPENCLAW_STATE_DIR;
}
}
async function withTempStateDir<T>(fn: (stateDir: string) => Promise<T>): Promise<T> {
const stateDir = path.join(fixtureRoot, `case-${++caseId}`);
await fs.mkdir(stateDir, { recursive: true });
const envSnapshot = snapshotStateEnv();
process.env.OPENCLAW_STATE_DIR = stateDir;
try {
return await fn(stateDir);
} finally {
restoreStateEnv(envSnapshot);
}
}
describe("runReplyAgent typing (heartbeat)", () => {
installRunReplyAgentTypingHeartbeatTestHooks();
beforeAll(async () => {
fixtureRoot = await fs.mkdtemp(path.join(tmpdir(), "openclaw-typing-heartbeat-"));
});
afterAll(async () => {
if (fixtureRoot) {
await fs.rm(fixtureRoot, { recursive: true, force: true });
}
});
beforeEach(() => {
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
});
it("signals typing for normal runs", async () => {
const onPartialReply = vi.fn();
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onPartialReply?.({ text: "hi" });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
opts: { isHeartbeat: false, onPartialReply },
});
await run();
expect(onPartialReply).toHaveBeenCalled();
expect(typing.startTypingOnText).toHaveBeenCalledWith("hi");
expect(typing.startTypingLoop).toHaveBeenCalled();
});
it("signals typing even without consumer partial handler", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onPartialReply?.({ text: "hi" });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
typingMode: "message",
});
await run();
expect(typing.startTypingOnText).toHaveBeenCalledWith("hi");
expect(typing.startTypingLoop).not.toHaveBeenCalled();
});
it("never signals typing for heartbeat runs", async () => {
const onPartialReply = vi.fn();
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onPartialReply?.({ text: "hi" });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
opts: { isHeartbeat: true, onPartialReply },
});
await run();
expect(onPartialReply).toHaveBeenCalled();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
expect(typing.startTypingLoop).not.toHaveBeenCalled();
});
it("suppresses partial streaming for NO_REPLY", async () => {
const onPartialReply = vi.fn();
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onPartialReply?.({ text: "NO_REPLY" });
return { payloads: [{ text: "NO_REPLY" }], meta: {} };
});
const { run, typing } = createMinimalRun({
opts: { isHeartbeat: false, onPartialReply },
typingMode: "message",
});
await run();
expect(onPartialReply).not.toHaveBeenCalled();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
expect(typing.startTypingLoop).not.toHaveBeenCalled();
});
it("does not start typing on assistant message start without prior text in message mode", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onAssistantMessageStart?.();
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
typingMode: "message",
});
await run();
expect(typing.startTypingLoop).not.toHaveBeenCalled();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
});
it("starts typing from reasoning stream in thinking mode", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onReasoningStream?.({ text: "Reasoning:\n_step_" });
await params.onPartialReply?.({ text: "hi" });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
typingMode: "thinking",
});
await run();
expect(typing.startTypingLoop).toHaveBeenCalled();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
});
it("suppresses typing in never mode", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
params.onPartialReply?.({ text: "hi" });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
typingMode: "never",
});
await run();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
expect(typing.startTypingLoop).not.toHaveBeenCalled();
});
it("signals typing on block replies", async () => {
const onBlockReply = vi.fn();
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onBlockReply?.({ text: "chunk", mediaUrls: [] });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
typingMode: "message",
blockStreamingEnabled: true,
opts: { onBlockReply },
});
await run();
expect(typing.startTypingOnText).toHaveBeenCalledWith("chunk");
expect(onBlockReply).toHaveBeenCalled();
const [blockPayload, blockOpts] = onBlockReply.mock.calls[0] ?? [];
expect(blockPayload).toMatchObject({ text: "chunk", audioAsVoice: false });
expect(blockOpts).toMatchObject({
abortSignal: expect.any(AbortSignal),
timeoutMs: expect.any(Number),
});
});
it("signals typing on tool results", async () => {
const onToolResult = vi.fn();
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onToolResult?.({ text: "tooling", mediaUrls: [] });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
typingMode: "message",
opts: { onToolResult },
});
await run();
expect(typing.startTypingOnText).toHaveBeenCalledWith("tooling");
expect(onToolResult).toHaveBeenCalledWith({
text: "tooling",
mediaUrls: [],
});
});
it("skips typing for silent tool results", async () => {
const onToolResult = vi.fn();
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onToolResult?.({ text: "NO_REPLY", mediaUrls: [] });
return { payloads: [{ text: "final" }], meta: {} };
});
const { run, typing } = createMinimalRun({
typingMode: "message",
opts: { onToolResult },
});
await run();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
expect(onToolResult).not.toHaveBeenCalled();
});
it("announces auto-compaction in verbose mode and tracks count", async () => {
await withTempStateDir(async (stateDir) => {
const storePath = path.join(stateDir, "sessions", "sessions.json");
const sessionEntry = { sessionId: "session", updatedAt: Date.now() };
const sessionStore = { main: sessionEntry };
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false },
});
return { payloads: [{ text: "final" }], meta: {} };
});
const { run } = createMinimalRun({
resolvedVerboseLevel: "on",
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(Array.isArray(res)).toBe(true);
const payloads = res as { text?: string }[];
expect(payloads[0]?.text).toContain("Auto-compaction complete");
expect(payloads[0]?.text).toContain("count 1");
expect(sessionStore.main.compactionCount).toBe(1);
});
});
it("retries after compaction failure by resetting the session", async () => {
await withTempStateDir(async (stateDir) => {
const sessionId = "session";
const storePath = path.join(stateDir, "sessions", "sessions.json");
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath };
const sessionStore = { main: sessionEntry };
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(transcriptPath, "ok", "utf-8");
runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
throw new Error(
'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}',
);
});
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
const payload = Array.isArray(res) ? res[0] : res;
expect(payload).toMatchObject({
text: expect.stringContaining("Context limit exceeded during compaction"),
});
expect(payload.text?.toLowerCase()).toContain("reset");
expect(sessionStore.main.sessionId).not.toBe(sessionId);
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId);
});
});
it("retries after context overflow payload by resetting the session", async () => {
await withTempStateDir(async (stateDir) => {
const sessionId = "session";
const storePath = path.join(stateDir, "sessions", "sessions.json");
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath };
const sessionStore = { main: sessionEntry };
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(transcriptPath, "ok", "utf-8");
runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({
payloads: [{ text: "Context overflow: prompt too large", isError: true }],
meta: {
durationMs: 1,
error: {
kind: "context_overflow",
message: 'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}',
},
},
}));
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
const payload = Array.isArray(res) ? res[0] : res;
expect(payload).toMatchObject({
text: expect.stringContaining("Context limit exceeded"),
});
expect(payload.text?.toLowerCase()).toContain("reset");
expect(sessionStore.main.sessionId).not.toBe(sessionId);
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId);
});
});
it("resets the session after role ordering payloads", async () => {
await withTempStateDir(async (stateDir) => {
const sessionId = "session";
const storePath = path.join(stateDir, "sessions", "sessions.json");
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath };
const sessionStore = { main: sessionEntry };
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(transcriptPath, "ok", "utf-8");
runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({
payloads: [{ text: "Message ordering conflict - please try again.", isError: true }],
meta: {
durationMs: 1,
error: {
kind: "role_ordering",
message: 'messages: roles must alternate between "user" and "assistant"',
},
},
}));
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
const payload = Array.isArray(res) ? res[0] : res;
expect(payload).toMatchObject({
text: expect.stringContaining("Message ordering conflict"),
});
expect(payload.text?.toLowerCase()).toContain("reset");
expect(sessionStore.main.sessionId).not.toBe(sessionId);
await expect(fs.access(transcriptPath)).rejects.toBeDefined();
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId);
});
});
it("resets corrupted Gemini sessions and deletes transcripts", async () => {
await withTempStateDir(async (stateDir) => {
const sessionId = "session-corrupt";
const storePath = path.join(stateDir, "sessions", "sessions.json");
const sessionEntry = { sessionId, updatedAt: Date.now() };
const sessionStore = { main: sessionEntry };
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(transcriptPath, "bad", "utf-8");
runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
throw new Error(
"function call turn comes immediately after a user turn or after a function response turn",
);
});
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(res).toMatchObject({
text: expect.stringContaining("Session history was corrupted"),
});
expect(sessionStore.main).toBeUndefined();
await expect(fs.access(transcriptPath)).rejects.toThrow();
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(persisted.main).toBeUndefined();
});
});
it("keeps sessions intact on other errors", async () => {
await withTempStateDir(async (stateDir) => {
const sessionId = "session-ok";
const storePath = path.join(stateDir, "sessions", "sessions.json");
const sessionEntry = { sessionId, updatedAt: Date.now() };
const sessionStore = { main: sessionEntry };
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(transcriptPath, "ok", "utf-8");
runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
throw new Error("INVALID_ARGUMENT: some other failure");
});
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(res).toMatchObject({
text: expect.stringContaining("Agent failed before reply"),
});
expect(sessionStore.main).toBeDefined();
await expect(fs.access(transcriptPath)).resolves.toBeUndefined();
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(persisted.main).toBeDefined();
});
});
it("still replies even if session reset fails to persist", async () => {
await withTempStateDir(async (stateDir) => {
const saveSpy = vi
.spyOn(sessions, "saveSessionStore")
.mockRejectedValueOnce(new Error("boom"));
try {
const sessionId = "session-corrupt";
const storePath = path.join(stateDir, "sessions", "sessions.json");
const sessionEntry = { sessionId, updatedAt: Date.now() };
const sessionStore = { main: sessionEntry };
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(transcriptPath, "bad", "utf-8");
runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
throw new Error(
"function call turn comes immediately after a user turn or after a function response turn",
);
});
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(res).toMatchObject({
text: expect.stringContaining("Session history was corrupted"),
});
expect(sessionStore.main).toBeUndefined();
await expect(fs.access(transcriptPath)).rejects.toThrow();
} finally {
saveSpy.mockRestore();
}
});
});
it("returns friendly message for role ordering errors thrown as exceptions", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
throw new Error("400 Incorrect role information");
});
const { run } = createMinimalRun({});
const res = await run();
expect(res).toMatchObject({
text: expect.stringContaining("Message ordering conflict"),
});
expect(res).toMatchObject({
text: expect.not.stringContaining("400"),
});
});
it("returns friendly message for 'roles must alternate' errors thrown as exceptions", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
throw new Error('messages: roles must alternate between "user" and "assistant"');
});
const { run } = createMinimalRun({});
const res = await run();
expect(res).toMatchObject({
text: expect.stringContaining("Message ordering conflict"),
});
});
it("rewrites Bun socket errors into friendly text", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({
payloads: [
{
text: "TypeError: The socket connection was closed unexpectedly. For more information, pass `verbose: true` in the second argument to fetch()",
isError: true,
},
],
meta: {},
}));
const { run } = createMinimalRun();
const res = await run();
const payloads = Array.isArray(res) ? res : res ? [res] : [];
expect(payloads.length).toBe(1);
expect(payloads[0]?.text).toContain("LLM connection failed");
expect(payloads[0]?.text).toContain("socket connection was closed unexpectedly");
expect(payloads[0]?.text).toContain("```");
});
});

View File

@@ -1,78 +0,0 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import {
createBaseRun,
getRunEmbeddedPiAgentMock,
seedSessionStore,
type EmbeddedRunParams,
} from "./agent-runner.memory-flush.test-harness.js";
import { DEFAULT_MEMORY_FLUSH_PROMPT } from "./memory-flush.js";
describe("runReplyAgent memory flush", () => {
it("increments compaction count when flush compaction completes", async () => {
const { runReplyAgent } = await import("./agent-runner.js");
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-"));
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) {
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false },
});
return { payloads: [], meta: {} };
}
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].compactionCount).toBe(2);
expect(stored[sessionKey].memoryFlushCompactionCount).toBe(2);
});
});

View File

@@ -1,145 +0,0 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { beforeAll, describe, expect, it } from "vitest";
import {
createBaseRun,
getRunEmbeddedPiAgentMock,
seedSessionStore,
type EmbeddedRunParams,
} from "./agent-runner.memory-flush.test-harness.js";
import { DEFAULT_MEMORY_FLUSH_PROMPT } from "./memory-flush.js";
let runReplyAgent: typeof import("./agent-runner.js").runReplyAgent;
beforeAll(async () => {
({ runReplyAgent } = await import("./agent-runner.js"));
});
describe("runReplyAgent memory flush", () => {
it("runs a memory flush turn and updates session metadata", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-"));
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
const calls: Array<{ prompt?: string }> = [];
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
calls.push({ prompt: params.prompt });
if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) {
return { payloads: [], meta: {} };
}
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(calls.map((call) => call.prompt)).toEqual([DEFAULT_MEMORY_FLUSH_PROMPT, "hello"]);
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].memoryFlushAt).toBeTypeOf("number");
expect(stored[sessionKey].memoryFlushCompactionCount).toBe(1);
});
it("skips memory flush when disabled in config", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-"));
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
runEmbeddedPiAgentMock.mockImplementation(async (_params: EmbeddedRunParams) => ({
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
}));
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
config: {
agents: {
defaults: { compaction: { memoryFlush: { enabled: false } } },
},
},
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
const call = runEmbeddedPiAgentMock.mock.calls[0]?.[0] as { prompt?: string } | undefined;
expect(call?.prompt).toBe("hello");
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].memoryFlushAt).toBeUndefined();
});
});

View File

@@ -1,81 +0,0 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import {
createBaseRun,
getRunCliAgentMock,
getRunEmbeddedPiAgentMock,
seedSessionStore,
type EmbeddedRunParams,
} from "./agent-runner.memory-flush.test-harness.js";
describe("runReplyAgent memory flush", () => {
it("skips memory flush for CLI providers", async () => {
const { runReplyAgent } = await import("./agent-runner.js");
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
const runCliAgentMock = getRunCliAgentMock();
runEmbeddedPiAgentMock.mockReset();
runCliAgentMock.mockReset();
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-"));
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
const calls: Array<{ prompt?: string }> = [];
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
calls.push({ prompt: params.prompt });
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
runCliAgentMock.mockResolvedValue({
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
runOverrides: { provider: "codex-cli" },
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(runCliAgentMock).toHaveBeenCalledTimes(1);
const call = runCliAgentMock.mock.calls[0]?.[0] as { prompt?: string } | undefined;
expect(call?.prompt).toBe("hello");
expect(runEmbeddedPiAgentMock).not.toHaveBeenCalled();
});
});

View File

@@ -1,148 +0,0 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { beforeAll, describe, expect, it } from "vitest";
import {
createBaseRun,
getRunEmbeddedPiAgentMock,
seedSessionStore,
type EmbeddedRunParams,
} from "./agent-runner.memory-flush.test-harness.js";
let runReplyAgent: typeof import("./agent-runner.js").runReplyAgent;
beforeAll(async () => {
({ runReplyAgent } = await import("./agent-runner.js"));
});
describe("runReplyAgent memory flush", () => {
it("skips memory flush when the sandbox workspace is read-only", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-"));
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
const calls: Array<{ prompt?: string }> = [];
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
calls.push({ prompt: params.prompt });
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
config: {
agents: {
defaults: {
sandbox: { mode: "all", workspaceAccess: "ro" },
},
},
},
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(calls.map((call) => call.prompt)).toEqual(["hello"]);
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].memoryFlushAt).toBeUndefined();
});
it("skips memory flush when the sandbox workspace is none", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-"));
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
const calls: Array<{ prompt?: string }> = [];
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
calls.push({ prompt: params.prompt });
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
config: {
agents: {
defaults: {
sandbox: { mode: "all", workspaceAccess: "none" },
},
},
},
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(calls.map((call) => call.prompt)).toEqual(["hello"]);
});
});

View File

@@ -0,0 +1,562 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import {
createBaseRun,
getRunCliAgentMock,
getRunEmbeddedPiAgentMock,
seedSessionStore,
type EmbeddedRunParams,
} from "./agent-runner.memory-flush.test-harness.js";
import { DEFAULT_MEMORY_FLUSH_PROMPT } from "./memory-flush.js";
let runReplyAgent: typeof import("./agent-runner.js").runReplyAgent;
let fixtureRoot = "";
let caseId = 0;
async function withTempStore<T>(fn: (storePath: string) => Promise<T>): Promise<T> {
const dir = path.join(fixtureRoot, `case-${++caseId}`);
await fs.mkdir(dir, { recursive: true });
return await fn(path.join(dir, "sessions.json"));
}
beforeAll(async () => {
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-memory-flush-"));
({ runReplyAgent } = await import("./agent-runner.js"));
});
afterAll(async () => {
if (fixtureRoot) {
await fs.rm(fixtureRoot, { recursive: true, force: true });
}
});
describe("runReplyAgent memory flush", () => {
it("skips memory flush for CLI providers", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
const runCliAgentMock = getRunCliAgentMock();
runEmbeddedPiAgentMock.mockReset();
runCliAgentMock.mockReset();
await withTempStore(async (storePath) => {
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
runEmbeddedPiAgentMock.mockImplementation(async () => ({
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
}));
runCliAgentMock.mockResolvedValue({
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
runOverrides: { provider: "codex-cli" },
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(runCliAgentMock).toHaveBeenCalledTimes(1);
const call = runCliAgentMock.mock.calls[0]?.[0] as { prompt?: string } | undefined;
expect(call?.prompt).toBe("hello");
expect(runEmbeddedPiAgentMock).not.toHaveBeenCalled();
});
});
it("uses configured prompts for memory flush runs", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
await withTempStore(async (storePath) => {
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
const calls: Array<EmbeddedRunParams> = [];
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
calls.push(params);
if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) {
return { payloads: [], meta: {} };
}
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
config: {
agents: {
defaults: {
compaction: {
memoryFlush: {
prompt: "Write notes.",
systemPrompt: "Flush memory now.",
},
},
},
},
},
runOverrides: { extraSystemPrompt: "extra system" },
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
const flushCall = calls[0];
expect(flushCall?.prompt).toContain("Write notes.");
expect(flushCall?.prompt).toContain("NO_REPLY");
expect(flushCall?.extraSystemPrompt).toContain("extra system");
expect(flushCall?.extraSystemPrompt).toContain("Flush memory now.");
expect(flushCall?.extraSystemPrompt).toContain("NO_REPLY");
expect(calls[1]?.prompt).toBe("hello");
});
});
it("runs a memory flush turn and updates session metadata", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
await withTempStore(async (storePath) => {
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
const calls: Array<{ prompt?: string }> = [];
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
calls.push({ prompt: params.prompt });
if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) {
return { payloads: [], meta: {} };
}
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(calls.map((call) => call.prompt)).toEqual([DEFAULT_MEMORY_FLUSH_PROMPT, "hello"]);
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].memoryFlushAt).toBeTypeOf("number");
expect(stored[sessionKey].memoryFlushCompactionCount).toBe(1);
});
});
it("skips memory flush when disabled in config", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
await withTempStore(async (storePath) => {
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
runEmbeddedPiAgentMock.mockImplementation(async () => ({
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
}));
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
config: { agents: { defaults: { compaction: { memoryFlush: { enabled: false } } } } },
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
const call = runEmbeddedPiAgentMock.mock.calls[0]?.[0] as { prompt?: string } | undefined;
expect(call?.prompt).toBe("hello");
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].memoryFlushAt).toBeUndefined();
});
});
it("skips memory flush after a prior flush in the same compaction cycle", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
await withTempStore(async (storePath) => {
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 2,
memoryFlushCompactionCount: 2,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
const calls: Array<{ prompt?: string }> = [];
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
calls.push({ prompt: params.prompt });
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(calls.map((call) => call.prompt)).toEqual(["hello"]);
});
});
it("skips memory flush when the sandbox workspace is read-only", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
await withTempStore(async (storePath) => {
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
const calls: Array<{ prompt?: string }> = [];
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
calls.push({ prompt: params.prompt });
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
config: {
agents: {
defaults: {
sandbox: { mode: "all", workspaceAccess: "ro" },
},
},
},
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(calls.map((call) => call.prompt)).toEqual(["hello"]);
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].memoryFlushAt).toBeUndefined();
});
});
it("skips memory flush when the sandbox workspace is none", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
await withTempStore(async (storePath) => {
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
const calls: Array<{ prompt?: string }> = [];
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
calls.push({ prompt: params.prompt });
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
config: {
agents: {
defaults: {
sandbox: { mode: "all", workspaceAccess: "none" },
},
},
},
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(calls.map((call) => call.prompt)).toEqual(["hello"]);
});
});
it("increments compaction count when flush compaction completes", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
await withTempStore(async (storePath) => {
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) {
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false },
});
return { payloads: [], meta: {} };
}
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].compactionCount).toBe(2);
expect(stored[sessionKey].memoryFlushCompactionCount).toBe(2);
});
});
});

View File

@@ -1,155 +0,0 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { beforeAll, describe, expect, it } from "vitest";
import {
createBaseRun,
getRunEmbeddedPiAgentMock,
seedSessionStore,
type EmbeddedRunParams,
} from "./agent-runner.memory-flush.test-harness.js";
import { DEFAULT_MEMORY_FLUSH_PROMPT } from "./memory-flush.js";
let runReplyAgent: typeof import("./agent-runner.js").runReplyAgent;
beforeAll(async () => {
({ runReplyAgent } = await import("./agent-runner.js"));
});
describe("runReplyAgent memory flush", () => {
it("uses configured prompts for memory flush runs", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-"));
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
const calls: Array<EmbeddedRunParams> = [];
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
calls.push(params);
if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) {
return { payloads: [], meta: {} };
}
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
config: {
agents: {
defaults: {
compaction: {
memoryFlush: {
prompt: "Write notes.",
systemPrompt: "Flush memory now.",
},
},
},
},
},
runOverrides: { extraSystemPrompt: "extra system" },
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
const flushCall = calls[0];
expect(flushCall?.prompt).toContain("Write notes.");
expect(flushCall?.prompt).toContain("NO_REPLY");
expect(flushCall?.extraSystemPrompt).toContain("extra system");
expect(flushCall?.extraSystemPrompt).toContain("Flush memory now.");
expect(flushCall?.extraSystemPrompt).toContain("NO_REPLY");
expect(calls[1]?.prompt).toBe("hello");
});
it("skips memory flush after a prior flush in the same compaction cycle", async () => {
const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock();
runEmbeddedPiAgentMock.mockReset();
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-"));
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 2,
memoryFlushCompactionCount: 2,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
const calls: Array<{ prompt?: string }> = [];
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
calls.push({ prompt: params.prompt });
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(calls.map((call) => call.prompt)).toEqual(["hello"]);
});
});