diff --git a/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.resets-corrupted-gemini-sessions-deletes-transcripts.test.ts b/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.resets-corrupted-gemini-sessions-deletes-transcripts.test.ts deleted file mode 100644 index c115a59f0..000000000 --- a/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.resets-corrupted-gemini-sessions-deletes-transcripts.test.ts +++ /dev/null @@ -1,135 +0,0 @@ -import fs from "node:fs/promises"; -import { tmpdir } from "node:os"; -import path from "node:path"; -import { describe, expect, it } from "vitest"; -import * as sessions from "../../config/sessions.js"; -import { - createMinimalRun, - getRunEmbeddedPiAgentMock, - installRunReplyAgentTypingHeartbeatTestHooks, -} from "./agent-runner.heartbeat-typing.test-harness.js"; -const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); - -describe("runReplyAgent typing (heartbeat)", () => { - installRunReplyAgentTypingHeartbeatTestHooks(); - - it("resets corrupted Gemini sessions and deletes transcripts", async () => { - const prevStateDir = process.env.OPENCLAW_STATE_DIR; - const stateDir = await fs.mkdtemp(path.join(tmpdir(), "openclaw-session-reset-")); - process.env.OPENCLAW_STATE_DIR = stateDir; - try { - const sessionId = "session-corrupt"; - const storePath = path.join(stateDir, "sessions", "sessions.json"); - const sessionEntry = { sessionId, updatedAt: Date.now() }; - const sessionStore = { main: sessionEntry }; - - await fs.mkdir(path.dirname(storePath), { recursive: true }); - await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); - - const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile(transcriptPath, "bad", "utf-8"); - - runEmbeddedPiAgentMock.mockImplementationOnce(async () => { - throw new Error( - "function call turn comes immediately after a user turn or after a function response turn", - ); - }); - - const { run } = createMinimalRun({ - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - const res = await run(); - - expect(res).toMatchObject({ - text: expect.stringContaining("Session history was corrupted"), - }); - expect(sessionStore.main).toBeUndefined(); - await expect(fs.access(transcriptPath)).rejects.toThrow(); - - const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(persisted.main).toBeUndefined(); - } finally { - if (prevStateDir) { - process.env.OPENCLAW_STATE_DIR = prevStateDir; - } else { - delete process.env.OPENCLAW_STATE_DIR; - } - } - }); - it("keeps sessions intact on other errors", async () => { - const prevStateDir = process.env.OPENCLAW_STATE_DIR; - const stateDir = await fs.mkdtemp(path.join(tmpdir(), "openclaw-session-noreset-")); - process.env.OPENCLAW_STATE_DIR = stateDir; - try { - const sessionId = "session-ok"; - const storePath = path.join(stateDir, "sessions", "sessions.json"); - const sessionEntry = { sessionId, updatedAt: Date.now() }; - const sessionStore = { main: sessionEntry }; - - await fs.mkdir(path.dirname(storePath), { recursive: true }); - await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); - - const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile(transcriptPath, "ok", "utf-8"); - - runEmbeddedPiAgentMock.mockImplementationOnce(async () => { - throw new Error("INVALID_ARGUMENT: some other failure"); - }); - - const { run } = createMinimalRun({ - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - const res = await run(); - - expect(res).toMatchObject({ - text: expect.stringContaining("Agent failed before reply"), - }); - expect(sessionStore.main).toBeDefined(); - await expect(fs.access(transcriptPath)).resolves.toBeUndefined(); - - const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(persisted.main).toBeDefined(); - } finally { - if (prevStateDir) { - process.env.OPENCLAW_STATE_DIR = prevStateDir; - } else { - delete process.env.OPENCLAW_STATE_DIR; - } - } - }); - it("returns friendly message for role ordering errors thrown as exceptions", async () => { - runEmbeddedPiAgentMock.mockImplementationOnce(async () => { - throw new Error("400 Incorrect role information"); - }); - - const { run } = createMinimalRun({}); - const res = await run(); - - expect(res).toMatchObject({ - text: expect.stringContaining("Message ordering conflict"), - }); - expect(res).toMatchObject({ - text: expect.not.stringContaining("400"), - }); - }); - it("returns friendly message for 'roles must alternate' errors thrown as exceptions", async () => { - runEmbeddedPiAgentMock.mockImplementationOnce(async () => { - throw new Error('messages: roles must alternate between "user" and "assistant"'); - }); - - const { run } = createMinimalRun({}); - const res = await run(); - - expect(res).toMatchObject({ - text: expect.stringContaining("Message ordering conflict"), - }); - }); -}); diff --git a/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.retries-after-compaction-failure-by-resetting-session.test.ts b/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.retries-after-compaction-failure-by-resetting-session.test.ts deleted file mode 100644 index 65184e579..000000000 --- a/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.retries-after-compaction-failure-by-resetting-session.test.ts +++ /dev/null @@ -1,172 +0,0 @@ -import fs from "node:fs/promises"; -import { tmpdir } from "node:os"; -import path from "node:path"; -import { describe, expect, it } from "vitest"; -import * as sessions from "../../config/sessions.js"; -import { - createMinimalRun, - getRunEmbeddedPiAgentMock, - installRunReplyAgentTypingHeartbeatTestHooks, -} from "./agent-runner.heartbeat-typing.test-harness.js"; -const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); - -describe("runReplyAgent typing (heartbeat)", () => { - installRunReplyAgentTypingHeartbeatTestHooks(); - - it("retries after compaction failure by resetting the session", async () => { - const prevStateDir = process.env.OPENCLAW_STATE_DIR; - const stateDir = await fs.mkdtemp(path.join(tmpdir(), "openclaw-session-compaction-reset-")); - process.env.OPENCLAW_STATE_DIR = stateDir; - try { - const sessionId = "session"; - const storePath = path.join(stateDir, "sessions", "sessions.json"); - const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); - const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath }; - const sessionStore = { main: sessionEntry }; - - await fs.mkdir(path.dirname(storePath), { recursive: true }); - await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile(transcriptPath, "ok", "utf-8"); - - runEmbeddedPiAgentMock.mockImplementationOnce(async () => { - throw new Error( - 'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}', - ); - }); - - const { run } = createMinimalRun({ - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - const res = await run(); - - expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); - const payload = Array.isArray(res) ? res[0] : res; - expect(payload).toMatchObject({ - text: expect.stringContaining("Context limit exceeded during compaction"), - }); - expect(payload.text?.toLowerCase()).toContain("reset"); - expect(sessionStore.main.sessionId).not.toBe(sessionId); - - const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId); - } finally { - if (prevStateDir) { - process.env.OPENCLAW_STATE_DIR = prevStateDir; - } else { - delete process.env.OPENCLAW_STATE_DIR; - } - } - }); - - it("retries after context overflow payload by resetting the session", async () => { - const prevStateDir = process.env.OPENCLAW_STATE_DIR; - const stateDir = await fs.mkdtemp(path.join(tmpdir(), "openclaw-session-overflow-reset-")); - process.env.OPENCLAW_STATE_DIR = stateDir; - try { - const sessionId = "session"; - const storePath = path.join(stateDir, "sessions", "sessions.json"); - const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); - const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath }; - const sessionStore = { main: sessionEntry }; - - await fs.mkdir(path.dirname(storePath), { recursive: true }); - await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile(transcriptPath, "ok", "utf-8"); - - runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({ - payloads: [{ text: "Context overflow: prompt too large", isError: true }], - meta: { - durationMs: 1, - error: { - kind: "context_overflow", - message: 'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}', - }, - }, - })); - - const { run } = createMinimalRun({ - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - const res = await run(); - - expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); - const payload = Array.isArray(res) ? res[0] : res; - expect(payload).toMatchObject({ - text: expect.stringContaining("Context limit exceeded"), - }); - expect(payload.text?.toLowerCase()).toContain("reset"); - expect(sessionStore.main.sessionId).not.toBe(sessionId); - - const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId); - } finally { - if (prevStateDir) { - process.env.OPENCLAW_STATE_DIR = prevStateDir; - } else { - delete process.env.OPENCLAW_STATE_DIR; - } - } - }); - - it("resets the session after role ordering payloads", async () => { - const prevStateDir = process.env.OPENCLAW_STATE_DIR; - const stateDir = await fs.mkdtemp(path.join(tmpdir(), "openclaw-session-role-ordering-")); - process.env.OPENCLAW_STATE_DIR = stateDir; - try { - const sessionId = "session"; - const storePath = path.join(stateDir, "sessions", "sessions.json"); - const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); - const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath }; - const sessionStore = { main: sessionEntry }; - - await fs.mkdir(path.dirname(storePath), { recursive: true }); - await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile(transcriptPath, "ok", "utf-8"); - - runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({ - payloads: [{ text: "Message ordering conflict - please try again.", isError: true }], - meta: { - durationMs: 1, - error: { - kind: "role_ordering", - message: 'messages: roles must alternate between "user" and "assistant"', - }, - }, - })); - - const { run } = createMinimalRun({ - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - const res = await run(); - - const payload = Array.isArray(res) ? res[0] : res; - expect(payload).toMatchObject({ - text: expect.stringContaining("Message ordering conflict"), - }); - expect(payload.text?.toLowerCase()).toContain("reset"); - expect(sessionStore.main.sessionId).not.toBe(sessionId); - await expect(fs.access(transcriptPath)).rejects.toBeDefined(); - - const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId); - } finally { - if (prevStateDir) { - process.env.OPENCLAW_STATE_DIR = prevStateDir; - } else { - delete process.env.OPENCLAW_STATE_DIR; - } - } - }); -}); diff --git a/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.signals-typing-block-replies.test.ts b/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.signals-typing-block-replies.test.ts deleted file mode 100644 index 10cd56699..000000000 --- a/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.signals-typing-block-replies.test.ts +++ /dev/null @@ -1,107 +0,0 @@ -import fs from "node:fs/promises"; -import { tmpdir } from "node:os"; -import path from "node:path"; -import { describe, expect, it, vi } from "vitest"; -import { - createMinimalRun, - getRunEmbeddedPiAgentMock, - installRunReplyAgentTypingHeartbeatTestHooks, -} from "./agent-runner.heartbeat-typing.test-harness.js"; -const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); - -describe("runReplyAgent typing (heartbeat)", () => { - installRunReplyAgentTypingHeartbeatTestHooks(); - - it("signals typing on block replies", async () => { - const onBlockReply = vi.fn(); - runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => { - await params.onBlockReply?.({ text: "chunk", mediaUrls: [] }); - return { payloads: [{ text: "final" }], meta: {} }; - }); - - const { run, typing } = createMinimalRun({ - typingMode: "message", - blockStreamingEnabled: true, - opts: { onBlockReply }, - }); - await run(); - - expect(typing.startTypingOnText).toHaveBeenCalledWith("chunk"); - expect(onBlockReply).toHaveBeenCalled(); - const [blockPayload, blockOpts] = onBlockReply.mock.calls[0] ?? []; - expect(blockPayload).toMatchObject({ text: "chunk", audioAsVoice: false }); - expect(blockOpts).toMatchObject({ - abortSignal: expect.any(AbortSignal), - timeoutMs: expect.any(Number), - }); - }); - it("signals typing on tool results", async () => { - const onToolResult = vi.fn(); - runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => { - await params.onToolResult?.({ text: "tooling", mediaUrls: [] }); - return { payloads: [{ text: "final" }], meta: {} }; - }); - - const { run, typing } = createMinimalRun({ - typingMode: "message", - opts: { onToolResult }, - }); - await run(); - - expect(typing.startTypingOnText).toHaveBeenCalledWith("tooling"); - expect(onToolResult).toHaveBeenCalledWith({ - text: "tooling", - mediaUrls: [], - }); - }); - it("skips typing for silent tool results", async () => { - const onToolResult = vi.fn(); - runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => { - await params.onToolResult?.({ text: "NO_REPLY", mediaUrls: [] }); - return { payloads: [{ text: "final" }], meta: {} }; - }); - - const { run, typing } = createMinimalRun({ - typingMode: "message", - opts: { onToolResult }, - }); - await run(); - - expect(typing.startTypingOnText).not.toHaveBeenCalled(); - expect(onToolResult).not.toHaveBeenCalled(); - }); - it("announces auto-compaction in verbose mode and tracks count", async () => { - const storePath = path.join( - await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-")), - "sessions.json", - ); - const sessionEntry = { sessionId: "session", updatedAt: Date.now() }; - const sessionStore = { main: sessionEntry }; - - runEmbeddedPiAgentMock.mockImplementationOnce( - async (params: { - onAgentEvent?: (evt: { stream: string; data: Record }) => void; - }) => { - params.onAgentEvent?.({ - stream: "compaction", - data: { phase: "end", willRetry: false }, - }); - return { payloads: [{ text: "final" }], meta: {} }; - }, - ); - - const { run } = createMinimalRun({ - resolvedVerboseLevel: "on", - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - const res = await run(); - expect(Array.isArray(res)).toBe(true); - const payloads = res as { text?: string }[]; - expect(payloads[0]?.text).toContain("Auto-compaction complete"); - expect(payloads[0]?.text).toContain("count 1"); - expect(sessionStore.main.compactionCount).toBe(1); - }); -}); diff --git a/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.signals-typing-normal-runs.test.ts b/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.signals-typing-normal-runs.test.ts deleted file mode 100644 index 575456fee..000000000 --- a/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.signals-typing-normal-runs.test.ts +++ /dev/null @@ -1,127 +0,0 @@ -import { describe, expect, it, vi } from "vitest"; -import { - createMinimalRun, - getRunEmbeddedPiAgentMock, - installRunReplyAgentTypingHeartbeatTestHooks, -} from "./agent-runner.heartbeat-typing.test-harness.js"; - -const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); - -describe("runReplyAgent typing (heartbeat)", () => { - installRunReplyAgentTypingHeartbeatTestHooks(); - - it("signals typing for normal runs", async () => { - const onPartialReply = vi.fn(); - runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => { - await params.onPartialReply?.({ text: "hi" }); - return { payloads: [{ text: "final" }], meta: {} }; - }); - - const { run, typing } = createMinimalRun({ - opts: { isHeartbeat: false, onPartialReply }, - }); - await run(); - - expect(onPartialReply).toHaveBeenCalled(); - expect(typing.startTypingOnText).toHaveBeenCalledWith("hi"); - expect(typing.startTypingLoop).toHaveBeenCalled(); - }); - it("signals typing even without consumer partial handler", async () => { - runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => { - await params.onPartialReply?.({ text: "hi" }); - return { payloads: [{ text: "final" }], meta: {} }; - }); - - const { run, typing } = createMinimalRun({ - typingMode: "message", - }); - await run(); - - expect(typing.startTypingOnText).toHaveBeenCalledWith("hi"); - expect(typing.startTypingLoop).not.toHaveBeenCalled(); - }); - it("never signals typing for heartbeat runs", async () => { - const onPartialReply = vi.fn(); - runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => { - await params.onPartialReply?.({ text: "hi" }); - return { payloads: [{ text: "final" }], meta: {} }; - }); - - const { run, typing } = createMinimalRun({ - opts: { isHeartbeat: true, onPartialReply }, - }); - await run(); - - expect(onPartialReply).toHaveBeenCalled(); - expect(typing.startTypingOnText).not.toHaveBeenCalled(); - expect(typing.startTypingLoop).not.toHaveBeenCalled(); - }); - it("suppresses partial streaming for NO_REPLY", async () => { - const onPartialReply = vi.fn(); - runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => { - await params.onPartialReply?.({ text: "NO_REPLY" }); - return { payloads: [{ text: "NO_REPLY" }], meta: {} }; - }); - - const { run, typing } = createMinimalRun({ - opts: { isHeartbeat: false, onPartialReply }, - typingMode: "message", - }); - await run(); - - expect(onPartialReply).not.toHaveBeenCalled(); - expect(typing.startTypingOnText).not.toHaveBeenCalled(); - expect(typing.startTypingLoop).not.toHaveBeenCalled(); - }); - it("does not start typing on assistant message start without prior text in message mode", async () => { - runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedPiAgentParams) => { - await params.onAssistantMessageStart?.(); - return { payloads: [{ text: "final" }], meta: {} }; - }); - - const { run, typing } = createMinimalRun({ - typingMode: "message", - }); - await run(); - - // Typing only starts when there's actual renderable text, not on message start alone - expect(typing.startTypingLoop).not.toHaveBeenCalled(); - expect(typing.startTypingOnText).not.toHaveBeenCalled(); - }); - it("starts typing from reasoning stream in thinking mode", async () => { - runEmbeddedPiAgentMock.mockImplementationOnce( - async (params: { - onPartialReply?: (payload: { text?: string }) => Promise | void; - onReasoningStream?: (payload: { text?: string }) => Promise | void; - }) => { - await params.onReasoningStream?.({ text: "Reasoning:\n_step_" }); - await params.onPartialReply?.({ text: "hi" }); - return { payloads: [{ text: "final" }], meta: {} }; - }, - ); - - const { run, typing } = createMinimalRun({ - typingMode: "thinking", - }); - await run(); - - expect(typing.startTypingLoop).toHaveBeenCalled(); - expect(typing.startTypingOnText).not.toHaveBeenCalled(); - }); - it("suppresses typing in never mode", async () => { - runEmbeddedPiAgentMock.mockImplementationOnce( - async (params: { onPartialReply?: (payload: { text?: string }) => void }) => { - params.onPartialReply?.({ text: "hi" }); - return { payloads: [{ text: "final" }], meta: {} }; - }, - ); - - const { run, typing } = createMinimalRun({ - typingMode: "never", - }); - await run(); - - expect(typing.startTypingOnText).not.toHaveBeenCalled(); - expect(typing.startTypingLoop).not.toHaveBeenCalled(); - }); -}); diff --git a/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.still-replies-even-if-session-reset-fails.test.ts b/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.still-replies-even-if-session-reset-fails.test.ts deleted file mode 100644 index 8082db09d..000000000 --- a/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.still-replies-even-if-session-reset-fails.test.ts +++ /dev/null @@ -1,78 +0,0 @@ -import fs from "node:fs/promises"; -import { tmpdir } from "node:os"; -import path from "node:path"; -import { describe, expect, it, vi } from "vitest"; -import * as sessions from "../../config/sessions.js"; -import { - createMinimalRun, - getRunEmbeddedPiAgentMock, - installRunReplyAgentTypingHeartbeatTestHooks, -} from "./agent-runner.heartbeat-typing.test-harness.js"; -const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); - -describe("runReplyAgent typing (heartbeat)", () => { - installRunReplyAgentTypingHeartbeatTestHooks(); - - it("still replies even if session reset fails to persist", async () => { - const prevStateDir = process.env.OPENCLAW_STATE_DIR; - const stateDir = await fs.mkdtemp(path.join(tmpdir(), "openclaw-session-reset-fail-")); - process.env.OPENCLAW_STATE_DIR = stateDir; - const saveSpy = vi.spyOn(sessions, "saveSessionStore").mockRejectedValueOnce(new Error("boom")); - try { - const sessionId = "session-corrupt"; - const storePath = path.join(stateDir, "sessions", "sessions.json"); - const sessionEntry = { sessionId, updatedAt: Date.now() }; - const sessionStore = { main: sessionEntry }; - - const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile(transcriptPath, "bad", "utf-8"); - - runEmbeddedPiAgentMock.mockImplementationOnce(async () => { - throw new Error( - "function call turn comes immediately after a user turn or after a function response turn", - ); - }); - - const { run } = createMinimalRun({ - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - const res = await run(); - - expect(res).toMatchObject({ - text: expect.stringContaining("Session history was corrupted"), - }); - expect(sessionStore.main).toBeUndefined(); - await expect(fs.access(transcriptPath)).rejects.toThrow(); - } finally { - saveSpy.mockRestore(); - if (prevStateDir) { - process.env.OPENCLAW_STATE_DIR = prevStateDir; - } else { - delete process.env.OPENCLAW_STATE_DIR; - } - } - }); - it("rewrites Bun socket errors into friendly text", async () => { - runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({ - payloads: [ - { - text: "TypeError: The socket connection was closed unexpectedly. For more information, pass `verbose: true` in the second argument to fetch()", - isError: true, - }, - ], - meta: {}, - })); - - const { run } = createMinimalRun(); - const res = await run(); - const payloads = Array.isArray(res) ? res : res ? [res] : []; - expect(payloads.length).toBe(1); - expect(payloads[0]?.text).toContain("LLM connection failed"); - expect(payloads[0]?.text).toContain("socket connection was closed unexpectedly"); - expect(payloads[0]?.text).toContain("```"); - }); -}); diff --git a/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.test.ts b/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.test.ts new file mode 100644 index 000000000..a6f585ddf --- /dev/null +++ b/src/auto-reply/reply/agent-runner.heartbeat-typing.runreplyagent-typing-heartbeat.test.ts @@ -0,0 +1,569 @@ +import fs from "node:fs/promises"; +import { tmpdir } from "node:os"; +import path from "node:path"; +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import * as sessions from "../../config/sessions.js"; +import { + createMinimalRun, + getRunEmbeddedPiAgentMock, + installRunReplyAgentTypingHeartbeatTestHooks, +} from "./agent-runner.heartbeat-typing.test-harness.js"; + +type AgentRunParams = { + onPartialReply?: (payload: { text?: string }) => Promise | void; + onAssistantMessageStart?: () => Promise | void; + onReasoningStream?: (payload: { text?: string }) => Promise | void; + onBlockReply?: (payload: { text?: string; mediaUrls?: string[] }) => Promise | void; + onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => Promise | void; + onAgentEvent?: (evt: { stream: string; data: Record }) => void; +}; + +const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); + +let fixtureRoot = ""; +let caseId = 0; + +type StateEnvSnapshot = { + OPENCLAW_STATE_DIR: string | undefined; +}; + +function snapshotStateEnv(): StateEnvSnapshot { + return { OPENCLAW_STATE_DIR: process.env.OPENCLAW_STATE_DIR }; +} + +function restoreStateEnv(snapshot: StateEnvSnapshot) { + if (snapshot.OPENCLAW_STATE_DIR === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = snapshot.OPENCLAW_STATE_DIR; + } +} + +async function withTempStateDir(fn: (stateDir: string) => Promise): Promise { + const stateDir = path.join(fixtureRoot, `case-${++caseId}`); + await fs.mkdir(stateDir, { recursive: true }); + const envSnapshot = snapshotStateEnv(); + process.env.OPENCLAW_STATE_DIR = stateDir; + try { + return await fn(stateDir); + } finally { + restoreStateEnv(envSnapshot); + } +} + +describe("runReplyAgent typing (heartbeat)", () => { + installRunReplyAgentTypingHeartbeatTestHooks(); + + beforeAll(async () => { + fixtureRoot = await fs.mkdtemp(path.join(tmpdir(), "openclaw-typing-heartbeat-")); + }); + + afterAll(async () => { + if (fixtureRoot) { + await fs.rm(fixtureRoot, { recursive: true, force: true }); + } + }); + + beforeEach(() => { + vi.stubEnv("OPENCLAW_TEST_FAST", "1"); + }); + + it("signals typing for normal runs", async () => { + const onPartialReply = vi.fn(); + runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onPartialReply?.({ text: "hi" }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const { run, typing } = createMinimalRun({ + opts: { isHeartbeat: false, onPartialReply }, + }); + await run(); + + expect(onPartialReply).toHaveBeenCalled(); + expect(typing.startTypingOnText).toHaveBeenCalledWith("hi"); + expect(typing.startTypingLoop).toHaveBeenCalled(); + }); + + it("signals typing even without consumer partial handler", async () => { + runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onPartialReply?.({ text: "hi" }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const { run, typing } = createMinimalRun({ + typingMode: "message", + }); + await run(); + + expect(typing.startTypingOnText).toHaveBeenCalledWith("hi"); + expect(typing.startTypingLoop).not.toHaveBeenCalled(); + }); + + it("never signals typing for heartbeat runs", async () => { + const onPartialReply = vi.fn(); + runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onPartialReply?.({ text: "hi" }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const { run, typing } = createMinimalRun({ + opts: { isHeartbeat: true, onPartialReply }, + }); + await run(); + + expect(onPartialReply).toHaveBeenCalled(); + expect(typing.startTypingOnText).not.toHaveBeenCalled(); + expect(typing.startTypingLoop).not.toHaveBeenCalled(); + }); + + it("suppresses partial streaming for NO_REPLY", async () => { + const onPartialReply = vi.fn(); + runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onPartialReply?.({ text: "NO_REPLY" }); + return { payloads: [{ text: "NO_REPLY" }], meta: {} }; + }); + + const { run, typing } = createMinimalRun({ + opts: { isHeartbeat: false, onPartialReply }, + typingMode: "message", + }); + await run(); + + expect(onPartialReply).not.toHaveBeenCalled(); + expect(typing.startTypingOnText).not.toHaveBeenCalled(); + expect(typing.startTypingLoop).not.toHaveBeenCalled(); + }); + + it("does not start typing on assistant message start without prior text in message mode", async () => { + runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onAssistantMessageStart?.(); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const { run, typing } = createMinimalRun({ + typingMode: "message", + }); + await run(); + + expect(typing.startTypingLoop).not.toHaveBeenCalled(); + expect(typing.startTypingOnText).not.toHaveBeenCalled(); + }); + + it("starts typing from reasoning stream in thinking mode", async () => { + runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onReasoningStream?.({ text: "Reasoning:\n_step_" }); + await params.onPartialReply?.({ text: "hi" }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const { run, typing } = createMinimalRun({ + typingMode: "thinking", + }); + await run(); + + expect(typing.startTypingLoop).toHaveBeenCalled(); + expect(typing.startTypingOnText).not.toHaveBeenCalled(); + }); + + it("suppresses typing in never mode", async () => { + runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + params.onPartialReply?.({ text: "hi" }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const { run, typing } = createMinimalRun({ + typingMode: "never", + }); + await run(); + + expect(typing.startTypingOnText).not.toHaveBeenCalled(); + expect(typing.startTypingLoop).not.toHaveBeenCalled(); + }); + + it("signals typing on block replies", async () => { + const onBlockReply = vi.fn(); + runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onBlockReply?.({ text: "chunk", mediaUrls: [] }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const { run, typing } = createMinimalRun({ + typingMode: "message", + blockStreamingEnabled: true, + opts: { onBlockReply }, + }); + await run(); + + expect(typing.startTypingOnText).toHaveBeenCalledWith("chunk"); + expect(onBlockReply).toHaveBeenCalled(); + const [blockPayload, blockOpts] = onBlockReply.mock.calls[0] ?? []; + expect(blockPayload).toMatchObject({ text: "chunk", audioAsVoice: false }); + expect(blockOpts).toMatchObject({ + abortSignal: expect.any(AbortSignal), + timeoutMs: expect.any(Number), + }); + }); + + it("signals typing on tool results", async () => { + const onToolResult = vi.fn(); + runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onToolResult?.({ text: "tooling", mediaUrls: [] }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const { run, typing } = createMinimalRun({ + typingMode: "message", + opts: { onToolResult }, + }); + await run(); + + expect(typing.startTypingOnText).toHaveBeenCalledWith("tooling"); + expect(onToolResult).toHaveBeenCalledWith({ + text: "tooling", + mediaUrls: [], + }); + }); + + it("skips typing for silent tool results", async () => { + const onToolResult = vi.fn(); + runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onToolResult?.({ text: "NO_REPLY", mediaUrls: [] }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const { run, typing } = createMinimalRun({ + typingMode: "message", + opts: { onToolResult }, + }); + await run(); + + expect(typing.startTypingOnText).not.toHaveBeenCalled(); + expect(onToolResult).not.toHaveBeenCalled(); + }); + + it("announces auto-compaction in verbose mode and tracks count", async () => { + await withTempStateDir(async (stateDir) => { + const storePath = path.join(stateDir, "sessions", "sessions.json"); + const sessionEntry = { sessionId: "session", updatedAt: Date.now() }; + const sessionStore = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: false }, + }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const { run } = createMinimalRun({ + resolvedVerboseLevel: "on", + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + const res = await run(); + expect(Array.isArray(res)).toBe(true); + const payloads = res as { text?: string }[]; + expect(payloads[0]?.text).toContain("Auto-compaction complete"); + expect(payloads[0]?.text).toContain("count 1"); + expect(sessionStore.main.compactionCount).toBe(1); + }); + }); + + it("retries after compaction failure by resetting the session", async () => { + await withTempStateDir(async (stateDir) => { + const sessionId = "session"; + const storePath = path.join(stateDir, "sessions", "sessions.json"); + const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); + const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath }; + const sessionStore = { main: sessionEntry }; + + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + await fs.writeFile(transcriptPath, "ok", "utf-8"); + + runEmbeddedPiAgentMock.mockImplementationOnce(async () => { + throw new Error( + 'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}', + ); + }); + + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + const res = await run(); + + expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); + const payload = Array.isArray(res) ? res[0] : res; + expect(payload).toMatchObject({ + text: expect.stringContaining("Context limit exceeded during compaction"), + }); + expect(payload.text?.toLowerCase()).toContain("reset"); + expect(sessionStore.main.sessionId).not.toBe(sessionId); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId); + }); + }); + + it("retries after context overflow payload by resetting the session", async () => { + await withTempStateDir(async (stateDir) => { + const sessionId = "session"; + const storePath = path.join(stateDir, "sessions", "sessions.json"); + const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); + const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath }; + const sessionStore = { main: sessionEntry }; + + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + await fs.writeFile(transcriptPath, "ok", "utf-8"); + + runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({ + payloads: [{ text: "Context overflow: prompt too large", isError: true }], + meta: { + durationMs: 1, + error: { + kind: "context_overflow", + message: 'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}', + }, + }, + })); + + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + const res = await run(); + + expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); + const payload = Array.isArray(res) ? res[0] : res; + expect(payload).toMatchObject({ + text: expect.stringContaining("Context limit exceeded"), + }); + expect(payload.text?.toLowerCase()).toContain("reset"); + expect(sessionStore.main.sessionId).not.toBe(sessionId); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId); + }); + }); + + it("resets the session after role ordering payloads", async () => { + await withTempStateDir(async (stateDir) => { + const sessionId = "session"; + const storePath = path.join(stateDir, "sessions", "sessions.json"); + const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); + const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath }; + const sessionStore = { main: sessionEntry }; + + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + await fs.writeFile(transcriptPath, "ok", "utf-8"); + + runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({ + payloads: [{ text: "Message ordering conflict - please try again.", isError: true }], + meta: { + durationMs: 1, + error: { + kind: "role_ordering", + message: 'messages: roles must alternate between "user" and "assistant"', + }, + }, + })); + + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + const res = await run(); + + const payload = Array.isArray(res) ? res[0] : res; + expect(payload).toMatchObject({ + text: expect.stringContaining("Message ordering conflict"), + }); + expect(payload.text?.toLowerCase()).toContain("reset"); + expect(sessionStore.main.sessionId).not.toBe(sessionId); + await expect(fs.access(transcriptPath)).rejects.toBeDefined(); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId); + }); + }); + + it("resets corrupted Gemini sessions and deletes transcripts", async () => { + await withTempStateDir(async (stateDir) => { + const sessionId = "session-corrupt"; + const storePath = path.join(stateDir, "sessions", "sessions.json"); + const sessionEntry = { sessionId, updatedAt: Date.now() }; + const sessionStore = { main: sessionEntry }; + + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); + + const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + await fs.writeFile(transcriptPath, "bad", "utf-8"); + + runEmbeddedPiAgentMock.mockImplementationOnce(async () => { + throw new Error( + "function call turn comes immediately after a user turn or after a function response turn", + ); + }); + + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + const res = await run(); + + expect(res).toMatchObject({ + text: expect.stringContaining("Session history was corrupted"), + }); + expect(sessionStore.main).toBeUndefined(); + await expect(fs.access(transcriptPath)).rejects.toThrow(); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(persisted.main).toBeUndefined(); + }); + }); + + it("keeps sessions intact on other errors", async () => { + await withTempStateDir(async (stateDir) => { + const sessionId = "session-ok"; + const storePath = path.join(stateDir, "sessions", "sessions.json"); + const sessionEntry = { sessionId, updatedAt: Date.now() }; + const sessionStore = { main: sessionEntry }; + + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); + + const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + await fs.writeFile(transcriptPath, "ok", "utf-8"); + + runEmbeddedPiAgentMock.mockImplementationOnce(async () => { + throw new Error("INVALID_ARGUMENT: some other failure"); + }); + + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + const res = await run(); + + expect(res).toMatchObject({ + text: expect.stringContaining("Agent failed before reply"), + }); + expect(sessionStore.main).toBeDefined(); + await expect(fs.access(transcriptPath)).resolves.toBeUndefined(); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(persisted.main).toBeDefined(); + }); + }); + + it("still replies even if session reset fails to persist", async () => { + await withTempStateDir(async (stateDir) => { + const saveSpy = vi + .spyOn(sessions, "saveSessionStore") + .mockRejectedValueOnce(new Error("boom")); + try { + const sessionId = "session-corrupt"; + const storePath = path.join(stateDir, "sessions", "sessions.json"); + const sessionEntry = { sessionId, updatedAt: Date.now() }; + const sessionStore = { main: sessionEntry }; + + const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + await fs.writeFile(transcriptPath, "bad", "utf-8"); + + runEmbeddedPiAgentMock.mockImplementationOnce(async () => { + throw new Error( + "function call turn comes immediately after a user turn or after a function response turn", + ); + }); + + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + const res = await run(); + + expect(res).toMatchObject({ + text: expect.stringContaining("Session history was corrupted"), + }); + expect(sessionStore.main).toBeUndefined(); + await expect(fs.access(transcriptPath)).rejects.toThrow(); + } finally { + saveSpy.mockRestore(); + } + }); + }); + + it("returns friendly message for role ordering errors thrown as exceptions", async () => { + runEmbeddedPiAgentMock.mockImplementationOnce(async () => { + throw new Error("400 Incorrect role information"); + }); + + const { run } = createMinimalRun({}); + const res = await run(); + + expect(res).toMatchObject({ + text: expect.stringContaining("Message ordering conflict"), + }); + expect(res).toMatchObject({ + text: expect.not.stringContaining("400"), + }); + }); + + it("returns friendly message for 'roles must alternate' errors thrown as exceptions", async () => { + runEmbeddedPiAgentMock.mockImplementationOnce(async () => { + throw new Error('messages: roles must alternate between "user" and "assistant"'); + }); + + const { run } = createMinimalRun({}); + const res = await run(); + + expect(res).toMatchObject({ + text: expect.stringContaining("Message ordering conflict"), + }); + }); + + it("rewrites Bun socket errors into friendly text", async () => { + runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({ + payloads: [ + { + text: "TypeError: The socket connection was closed unexpectedly. For more information, pass `verbose: true` in the second argument to fetch()", + isError: true, + }, + ], + meta: {}, + })); + + const { run } = createMinimalRun(); + const res = await run(); + const payloads = Array.isArray(res) ? res : res ? [res] : []; + expect(payloads.length).toBe(1); + expect(payloads[0]?.text).toContain("LLM connection failed"); + expect(payloads[0]?.text).toContain("socket connection was closed unexpectedly"); + expect(payloads[0]?.text).toContain("```"); + }); +}); diff --git a/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.increments-compaction-count-flush-compaction-completes.test.ts b/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.increments-compaction-count-flush-compaction-completes.test.ts deleted file mode 100644 index 604938da9..000000000 --- a/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.increments-compaction-count-flush-compaction-completes.test.ts +++ /dev/null @@ -1,78 +0,0 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; -import { describe, expect, it } from "vitest"; -import { - createBaseRun, - getRunEmbeddedPiAgentMock, - seedSessionStore, - type EmbeddedRunParams, -} from "./agent-runner.memory-flush.test-harness.js"; -import { DEFAULT_MEMORY_FLUSH_PROMPT } from "./memory-flush.js"; - -describe("runReplyAgent memory flush", () => { - it("increments compaction count when flush compaction completes", async () => { - const { runReplyAgent } = await import("./agent-runner.js"); - const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); - runEmbeddedPiAgentMock.mockReset(); - const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-")); - const storePath = path.join(tmp, "sessions.json"); - const sessionKey = "main"; - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) { - params.onAgentEvent?.({ - stream: "compaction", - data: { phase: "end", willRetry: false }, - }); - return { payloads: [], meta: {} }; - } - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ - storePath, - sessionEntry, - }); - - await runReplyAgent({ - commandBody: "hello", - followupRun, - queueKey: "main", - resolvedQueue, - shouldSteer: false, - shouldFollowup: false, - isActive: false, - isStreaming: false, - typing, - sessionCtx, - sessionEntry, - sessionStore: { [sessionKey]: sessionEntry }, - sessionKey, - storePath, - defaultModel: "anthropic/claude-opus-4-5", - agentCfgContextTokens: 100_000, - resolvedVerboseLevel: "off", - isNewSession: false, - blockStreamingEnabled: false, - resolvedBlockStreamingBreak: "message_end", - shouldInjectGroupIntro: false, - typingMode: "instant", - }); - - const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(stored[sessionKey].compactionCount).toBe(2); - expect(stored[sessionKey].memoryFlushCompactionCount).toBe(2); - }); -}); diff --git a/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.runs-memory-flush-turn-updates-session-metadata.test.ts b/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.runs-memory-flush-turn-updates-session-metadata.test.ts deleted file mode 100644 index 2b69cf5f2..000000000 --- a/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.runs-memory-flush-turn-updates-session-metadata.test.ts +++ /dev/null @@ -1,145 +0,0 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; -import { beforeAll, describe, expect, it } from "vitest"; -import { - createBaseRun, - getRunEmbeddedPiAgentMock, - seedSessionStore, - type EmbeddedRunParams, -} from "./agent-runner.memory-flush.test-harness.js"; -import { DEFAULT_MEMORY_FLUSH_PROMPT } from "./memory-flush.js"; - -let runReplyAgent: typeof import("./agent-runner.js").runReplyAgent; - -beforeAll(async () => { - ({ runReplyAgent } = await import("./agent-runner.js")); -}); - -describe("runReplyAgent memory flush", () => { - it("runs a memory flush turn and updates session metadata", async () => { - const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); - runEmbeddedPiAgentMock.mockReset(); - const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-")); - const storePath = path.join(tmp, "sessions.json"); - const sessionKey = "main"; - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - const calls: Array<{ prompt?: string }> = []; - runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - calls.push({ prompt: params.prompt }); - if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) { - return { payloads: [], meta: {} }; - } - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ - storePath, - sessionEntry, - }); - - await runReplyAgent({ - commandBody: "hello", - followupRun, - queueKey: "main", - resolvedQueue, - shouldSteer: false, - shouldFollowup: false, - isActive: false, - isStreaming: false, - typing, - sessionCtx, - sessionEntry, - sessionStore: { [sessionKey]: sessionEntry }, - sessionKey, - storePath, - defaultModel: "anthropic/claude-opus-4-5", - agentCfgContextTokens: 100_000, - resolvedVerboseLevel: "off", - isNewSession: false, - blockStreamingEnabled: false, - resolvedBlockStreamingBreak: "message_end", - shouldInjectGroupIntro: false, - typingMode: "instant", - }); - - expect(calls.map((call) => call.prompt)).toEqual([DEFAULT_MEMORY_FLUSH_PROMPT, "hello"]); - - const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(stored[sessionKey].memoryFlushAt).toBeTypeOf("number"); - expect(stored[sessionKey].memoryFlushCompactionCount).toBe(1); - }); - it("skips memory flush when disabled in config", async () => { - const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); - runEmbeddedPiAgentMock.mockReset(); - const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-")); - const storePath = path.join(tmp, "sessions.json"); - const sessionKey = "main"; - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - runEmbeddedPiAgentMock.mockImplementation(async (_params: EmbeddedRunParams) => ({ - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - })); - - const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ - storePath, - sessionEntry, - config: { - agents: { - defaults: { compaction: { memoryFlush: { enabled: false } } }, - }, - }, - }); - - await runReplyAgent({ - commandBody: "hello", - followupRun, - queueKey: "main", - resolvedQueue, - shouldSteer: false, - shouldFollowup: false, - isActive: false, - isStreaming: false, - typing, - sessionCtx, - sessionEntry, - sessionStore: { [sessionKey]: sessionEntry }, - sessionKey, - storePath, - defaultModel: "anthropic/claude-opus-4-5", - agentCfgContextTokens: 100_000, - resolvedVerboseLevel: "off", - isNewSession: false, - blockStreamingEnabled: false, - resolvedBlockStreamingBreak: "message_end", - shouldInjectGroupIntro: false, - typingMode: "instant", - }); - - expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); - const call = runEmbeddedPiAgentMock.mock.calls[0]?.[0] as { prompt?: string } | undefined; - expect(call?.prompt).toBe("hello"); - - const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(stored[sessionKey].memoryFlushAt).toBeUndefined(); - }); -}); diff --git a/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.skips-memory-flush-cli-providers.test.ts b/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.skips-memory-flush-cli-providers.test.ts deleted file mode 100644 index 39cfcacf4..000000000 --- a/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.skips-memory-flush-cli-providers.test.ts +++ /dev/null @@ -1,81 +0,0 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; -import { describe, expect, it } from "vitest"; -import { - createBaseRun, - getRunCliAgentMock, - getRunEmbeddedPiAgentMock, - seedSessionStore, - type EmbeddedRunParams, -} from "./agent-runner.memory-flush.test-harness.js"; - -describe("runReplyAgent memory flush", () => { - it("skips memory flush for CLI providers", async () => { - const { runReplyAgent } = await import("./agent-runner.js"); - const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); - const runCliAgentMock = getRunCliAgentMock(); - runEmbeddedPiAgentMock.mockReset(); - runCliAgentMock.mockReset(); - const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-")); - const storePath = path.join(tmp, "sessions.json"); - const sessionKey = "main"; - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - const calls: Array<{ prompt?: string }> = []; - runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - calls.push({ prompt: params.prompt }); - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - runCliAgentMock.mockResolvedValue({ - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }); - - const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ - storePath, - sessionEntry, - runOverrides: { provider: "codex-cli" }, - }); - - await runReplyAgent({ - commandBody: "hello", - followupRun, - queueKey: "main", - resolvedQueue, - shouldSteer: false, - shouldFollowup: false, - isActive: false, - isStreaming: false, - typing, - sessionCtx, - sessionEntry, - sessionStore: { [sessionKey]: sessionEntry }, - sessionKey, - storePath, - defaultModel: "anthropic/claude-opus-4-5", - agentCfgContextTokens: 100_000, - resolvedVerboseLevel: "off", - isNewSession: false, - blockStreamingEnabled: false, - resolvedBlockStreamingBreak: "message_end", - shouldInjectGroupIntro: false, - typingMode: "instant", - }); - - expect(runCliAgentMock).toHaveBeenCalledTimes(1); - const call = runCliAgentMock.mock.calls[0]?.[0] as { prompt?: string } | undefined; - expect(call?.prompt).toBe("hello"); - expect(runEmbeddedPiAgentMock).not.toHaveBeenCalled(); - }); -}); diff --git a/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.skips-memory-flush-sandbox-workspace-is-read.test.ts b/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.skips-memory-flush-sandbox-workspace-is-read.test.ts deleted file mode 100644 index 642e3615a..000000000 --- a/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.skips-memory-flush-sandbox-workspace-is-read.test.ts +++ /dev/null @@ -1,148 +0,0 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; -import { beforeAll, describe, expect, it } from "vitest"; -import { - createBaseRun, - getRunEmbeddedPiAgentMock, - seedSessionStore, - type EmbeddedRunParams, -} from "./agent-runner.memory-flush.test-harness.js"; - -let runReplyAgent: typeof import("./agent-runner.js").runReplyAgent; - -beforeAll(async () => { - ({ runReplyAgent } = await import("./agent-runner.js")); -}); - -describe("runReplyAgent memory flush", () => { - it("skips memory flush when the sandbox workspace is read-only", async () => { - const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); - runEmbeddedPiAgentMock.mockReset(); - const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-")); - const storePath = path.join(tmp, "sessions.json"); - const sessionKey = "main"; - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - const calls: Array<{ prompt?: string }> = []; - runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - calls.push({ prompt: params.prompt }); - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ - storePath, - sessionEntry, - config: { - agents: { - defaults: { - sandbox: { mode: "all", workspaceAccess: "ro" }, - }, - }, - }, - }); - - await runReplyAgent({ - commandBody: "hello", - followupRun, - queueKey: "main", - resolvedQueue, - shouldSteer: false, - shouldFollowup: false, - isActive: false, - isStreaming: false, - typing, - sessionCtx, - sessionEntry, - sessionStore: { [sessionKey]: sessionEntry }, - sessionKey, - storePath, - defaultModel: "anthropic/claude-opus-4-5", - agentCfgContextTokens: 100_000, - resolvedVerboseLevel: "off", - isNewSession: false, - blockStreamingEnabled: false, - resolvedBlockStreamingBreak: "message_end", - shouldInjectGroupIntro: false, - typingMode: "instant", - }); - - expect(calls.map((call) => call.prompt)).toEqual(["hello"]); - - const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(stored[sessionKey].memoryFlushAt).toBeUndefined(); - }); - it("skips memory flush when the sandbox workspace is none", async () => { - const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); - runEmbeddedPiAgentMock.mockReset(); - const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-")); - const storePath = path.join(tmp, "sessions.json"); - const sessionKey = "main"; - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - const calls: Array<{ prompt?: string }> = []; - runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - calls.push({ prompt: params.prompt }); - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ - storePath, - sessionEntry, - config: { - agents: { - defaults: { - sandbox: { mode: "all", workspaceAccess: "none" }, - }, - }, - }, - }); - - await runReplyAgent({ - commandBody: "hello", - followupRun, - queueKey: "main", - resolvedQueue, - shouldSteer: false, - shouldFollowup: false, - isActive: false, - isStreaming: false, - typing, - sessionCtx, - sessionEntry, - sessionStore: { [sessionKey]: sessionEntry }, - sessionKey, - storePath, - defaultModel: "anthropic/claude-opus-4-5", - agentCfgContextTokens: 100_000, - resolvedVerboseLevel: "off", - isNewSession: false, - blockStreamingEnabled: false, - resolvedBlockStreamingBreak: "message_end", - shouldInjectGroupIntro: false, - typingMode: "instant", - }); - - expect(calls.map((call) => call.prompt)).toEqual(["hello"]); - }); -}); diff --git a/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.test.ts b/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.test.ts new file mode 100644 index 000000000..86e08eb41 --- /dev/null +++ b/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.test.ts @@ -0,0 +1,562 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterAll, beforeAll, describe, expect, it } from "vitest"; +import { + createBaseRun, + getRunCliAgentMock, + getRunEmbeddedPiAgentMock, + seedSessionStore, + type EmbeddedRunParams, +} from "./agent-runner.memory-flush.test-harness.js"; +import { DEFAULT_MEMORY_FLUSH_PROMPT } from "./memory-flush.js"; + +let runReplyAgent: typeof import("./agent-runner.js").runReplyAgent; + +let fixtureRoot = ""; +let caseId = 0; + +async function withTempStore(fn: (storePath: string) => Promise): Promise { + const dir = path.join(fixtureRoot, `case-${++caseId}`); + await fs.mkdir(dir, { recursive: true }); + return await fn(path.join(dir, "sessions.json")); +} + +beforeAll(async () => { + fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-memory-flush-")); + ({ runReplyAgent } = await import("./agent-runner.js")); +}); + +afterAll(async () => { + if (fixtureRoot) { + await fs.rm(fixtureRoot, { recursive: true, force: true }); + } +}); + +describe("runReplyAgent memory flush", () => { + it("skips memory flush for CLI providers", async () => { + const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); + const runCliAgentMock = getRunCliAgentMock(); + runEmbeddedPiAgentMock.mockReset(); + runCliAgentMock.mockReset(); + + await withTempStore(async (storePath) => { + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + runEmbeddedPiAgentMock.mockImplementation(async () => ({ + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + })); + runCliAgentMock.mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + runOverrides: { provider: "codex-cli" }, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + expect(runCliAgentMock).toHaveBeenCalledTimes(1); + const call = runCliAgentMock.mock.calls[0]?.[0] as { prompt?: string } | undefined; + expect(call?.prompt).toBe("hello"); + expect(runEmbeddedPiAgentMock).not.toHaveBeenCalled(); + }); + }); + + it("uses configured prompts for memory flush runs", async () => { + const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); + runEmbeddedPiAgentMock.mockReset(); + + await withTempStore(async (storePath) => { + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + const calls: Array = []; + runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { + calls.push(params); + if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) { + return { payloads: [], meta: {} }; + } + return { + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }; + }); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config: { + agents: { + defaults: { + compaction: { + memoryFlush: { + prompt: "Write notes.", + systemPrompt: "Flush memory now.", + }, + }, + }, + }, + }, + runOverrides: { extraSystemPrompt: "extra system" }, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + const flushCall = calls[0]; + expect(flushCall?.prompt).toContain("Write notes."); + expect(flushCall?.prompt).toContain("NO_REPLY"); + expect(flushCall?.extraSystemPrompt).toContain("extra system"); + expect(flushCall?.extraSystemPrompt).toContain("Flush memory now."); + expect(flushCall?.extraSystemPrompt).toContain("NO_REPLY"); + expect(calls[1]?.prompt).toBe("hello"); + }); + }); + + it("runs a memory flush turn and updates session metadata", async () => { + const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); + runEmbeddedPiAgentMock.mockReset(); + + await withTempStore(async (storePath) => { + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + const calls: Array<{ prompt?: string }> = []; + runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { + calls.push({ prompt: params.prompt }); + if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) { + return { payloads: [], meta: {} }; + } + return { + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }; + }); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + expect(calls.map((call) => call.prompt)).toEqual([DEFAULT_MEMORY_FLUSH_PROMPT, "hello"]); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].memoryFlushAt).toBeTypeOf("number"); + expect(stored[sessionKey].memoryFlushCompactionCount).toBe(1); + }); + }); + + it("skips memory flush when disabled in config", async () => { + const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); + runEmbeddedPiAgentMock.mockReset(); + + await withTempStore(async (storePath) => { + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + runEmbeddedPiAgentMock.mockImplementation(async () => ({ + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + })); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config: { agents: { defaults: { compaction: { memoryFlush: { enabled: false } } } } }, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); + const call = runEmbeddedPiAgentMock.mock.calls[0]?.[0] as { prompt?: string } | undefined; + expect(call?.prompt).toBe("hello"); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].memoryFlushAt).toBeUndefined(); + }); + }); + + it("skips memory flush after a prior flush in the same compaction cycle", async () => { + const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); + runEmbeddedPiAgentMock.mockReset(); + + await withTempStore(async (storePath) => { + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 2, + memoryFlushCompactionCount: 2, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + const calls: Array<{ prompt?: string }> = []; + runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { + calls.push({ prompt: params.prompt }); + return { + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }; + }); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + expect(calls.map((call) => call.prompt)).toEqual(["hello"]); + }); + }); + + it("skips memory flush when the sandbox workspace is read-only", async () => { + const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); + runEmbeddedPiAgentMock.mockReset(); + + await withTempStore(async (storePath) => { + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + const calls: Array<{ prompt?: string }> = []; + runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { + calls.push({ prompt: params.prompt }); + return { + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }; + }); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config: { + agents: { + defaults: { + sandbox: { mode: "all", workspaceAccess: "ro" }, + }, + }, + }, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + expect(calls.map((call) => call.prompt)).toEqual(["hello"]); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].memoryFlushAt).toBeUndefined(); + }); + }); + + it("skips memory flush when the sandbox workspace is none", async () => { + const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); + runEmbeddedPiAgentMock.mockReset(); + + await withTempStore(async (storePath) => { + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + const calls: Array<{ prompt?: string }> = []; + runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { + calls.push({ prompt: params.prompt }); + return { + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }; + }); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config: { + agents: { + defaults: { + sandbox: { mode: "all", workspaceAccess: "none" }, + }, + }, + }, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + expect(calls.map((call) => call.prompt)).toEqual(["hello"]); + }); + }); + + it("increments compaction count when flush compaction completes", async () => { + const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); + runEmbeddedPiAgentMock.mockReset(); + + await withTempStore(async (storePath) => { + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { + if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) { + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: false }, + }); + return { payloads: [], meta: {} }; + } + return { + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }; + }); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].compactionCount).toBe(2); + expect(stored[sessionKey].memoryFlushCompactionCount).toBe(2); + }); + }); +}); diff --git a/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.uses-configured-prompts-memory-flush-runs.test.ts b/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.uses-configured-prompts-memory-flush-runs.test.ts deleted file mode 100644 index 73e1e55d2..000000000 --- a/src/auto-reply/reply/agent-runner.memory-flush.runreplyagent-memory-flush.uses-configured-prompts-memory-flush-runs.test.ts +++ /dev/null @@ -1,155 +0,0 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; -import { beforeAll, describe, expect, it } from "vitest"; -import { - createBaseRun, - getRunEmbeddedPiAgentMock, - seedSessionStore, - type EmbeddedRunParams, -} from "./agent-runner.memory-flush.test-harness.js"; -import { DEFAULT_MEMORY_FLUSH_PROMPT } from "./memory-flush.js"; - -let runReplyAgent: typeof import("./agent-runner.js").runReplyAgent; - -beforeAll(async () => { - ({ runReplyAgent } = await import("./agent-runner.js")); -}); - -describe("runReplyAgent memory flush", () => { - it("uses configured prompts for memory flush runs", async () => { - const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); - runEmbeddedPiAgentMock.mockReset(); - const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-")); - const storePath = path.join(tmp, "sessions.json"); - const sessionKey = "main"; - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - const calls: Array = []; - runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - calls.push(params); - if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) { - return { payloads: [], meta: {} }; - } - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ - storePath, - sessionEntry, - config: { - agents: { - defaults: { - compaction: { - memoryFlush: { - prompt: "Write notes.", - systemPrompt: "Flush memory now.", - }, - }, - }, - }, - }, - runOverrides: { extraSystemPrompt: "extra system" }, - }); - - await runReplyAgent({ - commandBody: "hello", - followupRun, - queueKey: "main", - resolvedQueue, - shouldSteer: false, - shouldFollowup: false, - isActive: false, - isStreaming: false, - typing, - sessionCtx, - sessionEntry, - sessionStore: { [sessionKey]: sessionEntry }, - sessionKey, - storePath, - defaultModel: "anthropic/claude-opus-4-5", - agentCfgContextTokens: 100_000, - resolvedVerboseLevel: "off", - isNewSession: false, - blockStreamingEnabled: false, - resolvedBlockStreamingBreak: "message_end", - shouldInjectGroupIntro: false, - typingMode: "instant", - }); - - const flushCall = calls[0]; - expect(flushCall?.prompt).toContain("Write notes."); - expect(flushCall?.prompt).toContain("NO_REPLY"); - expect(flushCall?.extraSystemPrompt).toContain("extra system"); - expect(flushCall?.extraSystemPrompt).toContain("Flush memory now."); - expect(flushCall?.extraSystemPrompt).toContain("NO_REPLY"); - expect(calls[1]?.prompt).toBe("hello"); - }); - it("skips memory flush after a prior flush in the same compaction cycle", async () => { - const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); - runEmbeddedPiAgentMock.mockReset(); - const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-flush-")); - const storePath = path.join(tmp, "sessions.json"); - const sessionKey = "main"; - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 2, - memoryFlushCompactionCount: 2, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - const calls: Array<{ prompt?: string }> = []; - runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - calls.push({ prompt: params.prompt }); - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ - storePath, - sessionEntry, - }); - - await runReplyAgent({ - commandBody: "hello", - followupRun, - queueKey: "main", - resolvedQueue, - shouldSteer: false, - shouldFollowup: false, - isActive: false, - isStreaming: false, - typing, - sessionCtx, - sessionEntry, - sessionStore: { [sessionKey]: sessionEntry }, - sessionKey, - storePath, - defaultModel: "anthropic/claude-opus-4-5", - agentCfgContextTokens: 100_000, - resolvedVerboseLevel: "off", - isNewSession: false, - blockStreamingEnabled: false, - resolvedBlockStreamingBreak: "message_end", - shouldInjectGroupIntro: false, - typingMode: "instant", - }); - - expect(calls.map((call) => call.prompt)).toEqual(["hello"]); - }); -});