diff --git a/src/auto-reply/reply/agent-runner-memory.ts b/src/auto-reply/reply/agent-runner-memory.ts index 4d7c3da87..b00b2c37e 100644 --- a/src/auto-reply/reply/agent-runner-memory.ts +++ b/src/auto-reply/reply/agent-runner-memory.ts @@ -96,6 +96,57 @@ function parseUsageFromTranscriptLine(line: string): ReturnType { + const logPath = resolveSessionLogPath(sessionId, sessionEntry, sessionKey, opts); + if (!logPath) { + return undefined; + } + try { + const stat = await fs.promises.stat(logPath); + const size = Math.floor(stat.size); + return Number.isFinite(size) && size >= 0 ? size : undefined; + } catch { + return undefined; + } +} + async function readLastNonzeroUsageFromSessionLog(logPath: string) { const handle = await fs.promises.open(logPath, "r"); try { @@ -134,28 +185,12 @@ export async function readPromptTokensFromSessionLog( sessionKey?: string, opts?: { storePath?: string }, ): Promise { - if (!sessionId) { + const logPath = resolveSessionLogPath(sessionId, sessionEntry, sessionKey, opts); + if (!logPath) { return undefined; } try { - const transcriptPath = ( - sessionEntry as (SessionEntry & { transcriptPath?: string }) | undefined - )?.transcriptPath?.trim(); - const sessionFile = sessionEntry?.sessionFile?.trim() || transcriptPath; - const agentId = resolveAgentIdFromSessionKey(sessionKey); - const pathOpts = resolveSessionFilePathOptions({ - agentId, - storePath: opts?.storePath, - }); - // Normalize sessionFile through resolveSessionFilePath so relative entries - // are resolved against the sessions dir/store layout, not process.cwd(). - const logPath = resolveSessionFilePath( - sessionId, - sessionFile ? { sessionFile } : sessionEntry, - pathOpts, - ); - const lastUsage = await readLastNonzeroUsageFromSessionLog(logPath); if (!lastUsage) { return undefined; @@ -262,6 +297,23 @@ export async function runMemoryFlushIfNeeded(params: { const shouldReadTranscript = canAttemptFlush && entry && (!hasFreshPersistedPromptTokens || shouldReadTranscriptForOutput); + const forceFlushTranscriptBytes = memoryFlushSettings.forceFlushTranscriptBytes; + const shouldCheckTranscriptSizeForForcedFlush = + canAttemptFlush && + entry && + Number.isFinite(forceFlushTranscriptBytes) && + forceFlushTranscriptBytes > 0; + const transcriptByteSize = shouldCheckTranscriptSizeForForcedFlush + ? await readSessionLogByteSize( + params.followupRun.run.sessionId, + entry, + params.sessionKey ?? params.followupRun.run.sessionKey, + { storePath: params.storePath }, + ) + : undefined; + const shouldForceFlushByTranscriptSize = + typeof transcriptByteSize === "number" && transcriptByteSize >= forceFlushTranscriptBytes; + const transcriptUsageSnapshot = shouldReadTranscript ? await readPromptTokensFromSessionLog( params.followupRun.run.sessionId, @@ -341,21 +393,23 @@ export async function runMemoryFlushIfNeeded(params: { `compactionCount=${entry?.compactionCount ?? 0} memoryFlushCompactionCount=${entry?.memoryFlushCompactionCount ?? "undefined"} ` + `persistedPromptTokens=${persistedPromptTokens ?? "undefined"} persistedFresh=${entry?.totalTokensFresh === true} ` + `promptTokensEst=${promptTokenEstimate ?? "undefined"} transcriptPromptTokens=${transcriptPromptTokens ?? "undefined"} transcriptOutputTokens=${transcriptOutputTokens ?? "undefined"} ` + - `projectedTokenCount=${projectedTokenCount ?? "undefined"}`, + `projectedTokenCount=${projectedTokenCount ?? "undefined"} transcriptBytes=${transcriptByteSize ?? "undefined"} ` + + `forceFlushTranscriptBytes=${forceFlushTranscriptBytes} forceFlushByTranscriptSize=${shouldForceFlushByTranscriptSize}`, ); const shouldFlushMemory = - memoryFlushSettings && - memoryFlushWritable && - !params.isHeartbeat && - !isCli && - shouldRunMemoryFlush({ - entry, - tokenCount: tokenCountForFlush, - contextWindowTokens, - reserveTokensFloor: memoryFlushSettings.reserveTokensFloor, - softThresholdTokens: memoryFlushSettings.softThresholdTokens, - }); + (memoryFlushSettings && + memoryFlushWritable && + !params.isHeartbeat && + !isCli && + shouldRunMemoryFlush({ + entry, + tokenCount: tokenCountForFlush, + contextWindowTokens, + reserveTokensFloor: memoryFlushSettings.reserveTokensFloor, + softThresholdTokens: memoryFlushSettings.softThresholdTokens, + })) || + shouldForceFlushByTranscriptSize; if (!shouldFlushMemory) { return entry ?? params.sessionEntry; diff --git a/src/auto-reply/reply/agent-runner.runreplyagent.test.ts b/src/auto-reply/reply/agent-runner.runreplyagent.test.ts index 294d9c938..85fd817de 100644 --- a/src/auto-reply/reply/agent-runner.runreplyagent.test.ts +++ b/src/auto-reply/reply/agent-runner.runreplyagent.test.ts @@ -8,7 +8,12 @@ import type { TypingMode } from "../../config/types.js"; import { withStateDirEnv } from "../../test-helpers/state-dir-env.js"; import type { TemplateContext } from "../templating.js"; import type { GetReplyOptions } from "../types.js"; -import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js"; +import { + enqueueFollowupRun, + scheduleFollowupDrain, + type FollowupRun, + type QueueSettings, +} from "./queue.js"; import { createMockTypingController } from "./test-helpers.js"; type AgentRunParams = { @@ -87,6 +92,7 @@ beforeEach(() => { state.runEmbeddedPiAgentMock.mockClear(); state.runCliAgentMock.mockClear(); vi.mocked(enqueueFollowupRun).mockClear(); + vi.mocked(scheduleFollowupDrain).mockClear(); vi.stubEnv("OPENCLAW_TEST_FAST", "1"); }); @@ -311,6 +317,25 @@ describe("runReplyAgent heartbeat followup guard", () => { expect(vi.mocked(enqueueFollowupRun)).toHaveBeenCalledTimes(1); expect(state.runEmbeddedPiAgentMock).not.toHaveBeenCalled(); }); + + it("drains followup queue when an unexpected exception escapes the run path", async () => { + const accounting = await import("./session-run-accounting.js"); + const persistSpy = vi + .spyOn(accounting, "persistRunSessionUsage") + .mockRejectedValueOnce(new Error("persist exploded")); + state.runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }); + + try { + const { run } = createMinimalRun(); + await expect(run()).rejects.toThrow("persist exploded"); + expect(vi.mocked(scheduleFollowupDrain)).toHaveBeenCalledTimes(1); + } finally { + persistSpy.mockRestore(); + } + }); }); describe("runReplyAgent typing (heartbeat)", () => { @@ -1661,6 +1686,68 @@ describe("runReplyAgent memory flush", () => { }); }); + it("forces memory flush when transcript file exceeds configured byte threshold", async () => { + await withTempStore(async (storePath) => { + const sessionKey = "main"; + const sessionFile = "oversized-session.jsonl"; + const transcriptPath = path.join(path.dirname(storePath), sessionFile); + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + await fs.writeFile(transcriptPath, "x".repeat(3_000), "utf-8"); + + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + sessionFile, + totalTokens: 10, + totalTokensFresh: false, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + const calls: Array<{ prompt?: string }> = []; + state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { + calls.push({ prompt: params.prompt }); + if (params.prompt?.includes("Pre-compaction memory flush.")) { + return { payloads: [], meta: {} }; + } + return { + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }; + }); + + const baseRun = createBaseRun({ + storePath, + sessionEntry, + config: { + agents: { + defaults: { + compaction: { + memoryFlush: { + forceFlushTranscriptBytes: 256, + }, + }, + }, + }, + }, + runOverrides: { sessionFile }, + }); + + await runReplyAgentWithBase({ + baseRun, + storePath, + sessionKey, + sessionEntry, + commandBody: "hello", + }); + + expect(calls).toHaveLength(2); + expect(calls[0]?.prompt).toContain("Pre-compaction memory flush."); + expect(calls[1]?.prompt).toBe("hello"); + }); + }); + it("skips memory flush when disabled in config", async () => { await withTempStore(async (storePath) => { const sessionKey = "main"; diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 1ae1b36f1..a799fa9c6 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -715,6 +715,11 @@ export async function runReplyAgent(params: { queueKey, runFollowupTurn, ); + } catch (error) { + // Keep the followup queue moving even when an unexpected exception escapes + // the run path; the caller still receives the original error. + finalizeWithFollowup(undefined, queueKey, runFollowupTurn); + throw error; } finally { blockReplyPipeline?.stop(); typing.markRunComplete(); diff --git a/src/auto-reply/reply/memory-flush.ts b/src/auto-reply/reply/memory-flush.ts index 536a66c93..4fe74e7d4 100644 --- a/src/auto-reply/reply/memory-flush.ts +++ b/src/auto-reply/reply/memory-flush.ts @@ -2,11 +2,13 @@ import { lookupContextTokens } from "../../agents/context.js"; import { resolveCronStyleNow } from "../../agents/current-time.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { DEFAULT_PI_COMPACTION_RESERVE_TOKENS_FLOOR } from "../../agents/pi-settings.js"; +import { parseByteSize } from "../../cli/parse-bytes.js"; import type { OpenClawConfig } from "../../config/config.js"; import { resolveFreshSessionTotalTokens, type SessionEntry } from "../../config/sessions.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; export const DEFAULT_MEMORY_FLUSH_SOFT_TOKENS = 4000; +export const DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES = 2 * 1024 * 1024; export const DEFAULT_MEMORY_FLUSH_PROMPT = [ "Pre-compaction memory flush.", @@ -58,6 +60,11 @@ export function resolveMemoryFlushPromptForRun(params: { export type MemoryFlushSettings = { enabled: boolean; softThresholdTokens: number; + /** + * Force a pre-compaction memory flush when the session transcript reaches this + * size. Set to 0 to disable byte-size based triggering. + */ + forceFlushTranscriptBytes: number; prompt: string; systemPrompt: string; reserveTokensFloor: number; @@ -71,6 +78,26 @@ const normalizeNonNegativeInt = (value: unknown): number | null => { return int >= 0 ? int : null; }; +const normalizeOptionalByteSize = (value: unknown): number | null => { + if (typeof value === "number" && Number.isFinite(value)) { + const int = Math.floor(value); + return int >= 0 ? int : null; + } + if (typeof value === "string") { + const trimmed = value.trim(); + if (!trimmed) { + return null; + } + try { + const bytes = parseByteSize(trimmed, { defaultUnit: "b" }); + return bytes >= 0 ? bytes : null; + } catch { + return null; + } + } + return null; +}; + export function resolveMemoryFlushSettings(cfg?: OpenClawConfig): MemoryFlushSettings | null { const defaults = cfg?.agents?.defaults?.compaction?.memoryFlush; const enabled = defaults?.enabled ?? true; @@ -79,6 +106,9 @@ export function resolveMemoryFlushSettings(cfg?: OpenClawConfig): MemoryFlushSet } const softThresholdTokens = normalizeNonNegativeInt(defaults?.softThresholdTokens) ?? DEFAULT_MEMORY_FLUSH_SOFT_TOKENS; + const forceFlushTranscriptBytes = + normalizeOptionalByteSize(defaults?.forceFlushTranscriptBytes) ?? + DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES; const prompt = defaults?.prompt?.trim() || DEFAULT_MEMORY_FLUSH_PROMPT; const systemPrompt = defaults?.systemPrompt?.trim() || DEFAULT_MEMORY_FLUSH_SYSTEM_PROMPT; const reserveTokensFloor = @@ -88,6 +118,7 @@ export function resolveMemoryFlushSettings(cfg?: OpenClawConfig): MemoryFlushSet return { enabled, softThresholdTokens, + forceFlushTranscriptBytes, prompt: ensureNoReplyHint(prompt), systemPrompt: ensureNoReplyHint(systemPrompt), reserveTokensFloor, diff --git a/src/auto-reply/reply/reply-state.test.ts b/src/auto-reply/reply/reply-state.test.ts index 75cc40252..0c619c132 100644 --- a/src/auto-reply/reply/reply-state.test.ts +++ b/src/auto-reply/reply/reply-state.test.ts @@ -15,6 +15,7 @@ import { recordPendingHistoryEntryIfEnabled, } from "./history.js"; import { + DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES, DEFAULT_MEMORY_FLUSH_SOFT_TOKENS, resolveMemoryFlushContextWindowTokens, resolveMemoryFlushSettings, @@ -198,6 +199,7 @@ describe("memory flush settings", () => { const settings = resolveMemoryFlushSettings(); expect(settings).not.toBeNull(); expect(settings?.enabled).toBe(true); + expect(settings?.forceFlushTranscriptBytes).toBe(DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES); expect(settings?.prompt.length).toBeGreaterThan(0); expect(settings?.systemPrompt.length).toBeGreaterThan(0); }); @@ -244,8 +246,25 @@ describe("memory flush settings", () => { }); expect(settings?.softThresholdTokens).toBe(DEFAULT_MEMORY_FLUSH_SOFT_TOKENS); + expect(settings?.forceFlushTranscriptBytes).toBe(DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES); expect(settings?.reserveTokensFloor).toBe(DEFAULT_PI_COMPACTION_RESERVE_TOKENS_FLOOR); }); + + it("parses forceFlushTranscriptBytes from byte-size strings", () => { + const settings = resolveMemoryFlushSettings({ + agents: { + defaults: { + compaction: { + memoryFlush: { + forceFlushTranscriptBytes: "3mb", + }, + }, + }, + }, + }); + + expect(settings?.forceFlushTranscriptBytes).toBe(3 * 1024 * 1024); + }); }); describe("shouldRunMemoryFlush", () => { diff --git a/src/auto-reply/reply/session-usage.ts b/src/auto-reply/reply/session-usage.ts index 2d7b6e7f9..6638a6738 100644 --- a/src/auto-reply/reply/session-usage.ts +++ b/src/auto-reply/reply/session-usage.ts @@ -93,8 +93,11 @@ export async function persistSessionUsageUpdate(params: { if (hasUsage) { patch.inputTokens = params.usage?.input ?? 0; patch.outputTokens = params.usage?.output ?? 0; - patch.cacheRead = params.usage?.cacheRead ?? 0; - patch.cacheWrite = params.usage?.cacheWrite ?? 0; + // Cache counters should reflect the latest context snapshot when + // available, not accumulated per-call totals across a whole run. + const cacheUsage = params.lastCallUsage ?? params.usage; + patch.cacheRead = cacheUsage?.cacheRead ?? 0; + patch.cacheWrite = cacheUsage?.cacheWrite ?? 0; } // Missing a last-call snapshot (and promptTokens fallback) means // context utilization is stale/unknown. diff --git a/src/auto-reply/reply/session.test.ts b/src/auto-reply/reply/session.test.ts index c17837bb4..1594f0ac6 100644 --- a/src/auto-reply/reply/session.test.ts +++ b/src/auto-reply/reply/session.test.ts @@ -1170,6 +1170,40 @@ describe("persistSessionUsageUpdate", () => { expect(stored[sessionKey].outputTokens).toBe(10_000); }); + it("uses lastCallUsage cache counters when available", async () => { + const storePath = await createStorePath("openclaw-usage-cache-"); + const sessionKey = "main"; + await seedSessionStore({ + storePath, + sessionKey, + entry: { sessionId: "s1", updatedAt: Date.now() }, + }); + + await persistSessionUsageUpdate({ + storePath, + sessionKey, + usage: { + input: 100_000, + output: 8_000, + cacheRead: 260_000, + cacheWrite: 90_000, + }, + lastCallUsage: { + input: 12_000, + output: 1_000, + cacheRead: 18_000, + cacheWrite: 4_000, + }, + contextTokensUsed: 200_000, + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].inputTokens).toBe(100_000); + expect(stored[sessionKey].outputTokens).toBe(8_000); + expect(stored[sessionKey].cacheRead).toBe(18_000); + expect(stored[sessionKey].cacheWrite).toBe(4_000); + }); + it("marks totalTokens as unknown when no fresh context snapshot is available", async () => { const storePath = await createStorePath("openclaw-usage-"); const sessionKey = "main"; diff --git a/src/config/types.agent-defaults.ts b/src/config/types.agent-defaults.ts index 303d3b953..00e514776 100644 --- a/src/config/types.agent-defaults.ts +++ b/src/config/types.agent-defaults.ts @@ -295,6 +295,11 @@ export type AgentCompactionMemoryFlushConfig = { enabled?: boolean; /** Run the memory flush when context is within this many tokens of the compaction threshold. */ softThresholdTokens?: number; + /** + * Force a memory flush when transcript size reaches this threshold + * (bytes, or byte-size string like "2mb"). Set to 0 to disable. + */ + forceFlushTranscriptBytes?: number | string; /** User prompt used for the memory flush turn (NO_REPLY is enforced if missing). */ prompt?: string; /** System prompt appended for the memory flush turn. */ diff --git a/src/config/zod-schema.agent-defaults.ts b/src/config/zod-schema.agent-defaults.ts index afbe226b0..1c04085f3 100644 --- a/src/config/zod-schema.agent-defaults.ts +++ b/src/config/zod-schema.agent-defaults.ts @@ -1,4 +1,5 @@ import { z } from "zod"; +import { parseByteSize } from "../cli/parse-bytes.js"; import { HeartbeatSchema, AgentSandboxSchema, @@ -92,6 +93,19 @@ export const AgentDefaultsSchema = z .object({ enabled: z.boolean().optional(), softThresholdTokens: z.number().int().nonnegative().optional(), + forceFlushTranscriptBytes: z + .union([ + z.number().int().nonnegative(), + z.string().refine((value) => { + try { + parseByteSize(value.trim(), { defaultUnit: "b" }); + return true; + } catch { + return false; + } + }, "Expected byte size string like 2mb"), + ]) + .optional(), prompt: z.string().optional(), systemPrompt: z.string().optional(), })