diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index 264638753..c132040d9 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -127,6 +127,31 @@ describe("config schema", () => { expect(listHint?.help).toContain("bluebubbles"); }); + it("caches merged schemas for identical plugin/channel metadata", () => { + const params = { + plugins: [ + { + id: "voice-call", + name: "Voice Call", + configSchema: { type: "object", properties: { provider: { type: "string" } } }, + }, + ], + channels: [ + { + id: "matrix", + label: "Matrix", + configSchema: { type: "object", properties: { accessToken: { type: "string" } } }, + }, + ], + }; + const first = buildConfigSchema(params); + const second = buildConfigSchema({ + plugins: [{ ...params.plugins[0] }], + channels: [{ ...params.channels[0] }], + }); + expect(second).toBe(first); + }); + it("derives security/auth tags for credential paths", () => { const tags = deriveTagsForPath("gateway.auth.token"); expect(tags).toContain("security"); diff --git a/src/config/schema.ts b/src/config/schema.ts index d2add2c96..58d93215d 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -297,6 +297,43 @@ function applyChannelSchemas(schema: ConfigSchema, channels: ChannelUiMetadata[] } let cachedBase: ConfigSchemaResponse | null = null; +const mergedSchemaCache = new Map(); +const MERGED_SCHEMA_CACHE_MAX = 64; + +function buildMergedSchemaCacheKey(params: { + plugins: PluginUiMetadata[]; + channels: ChannelUiMetadata[]; +}): string { + const plugins = params.plugins + .map((plugin) => ({ + id: plugin.id, + name: plugin.name, + description: plugin.description, + configSchema: plugin.configSchema ?? null, + configUiHints: plugin.configUiHints ?? null, + })) + .toSorted((a, b) => a.id.localeCompare(b.id)); + const channels = params.channels + .map((channel) => ({ + id: channel.id, + label: channel.label, + description: channel.description, + configSchema: channel.configSchema ?? null, + configUiHints: channel.configUiHints ?? null, + })) + .toSorted((a, b) => a.id.localeCompare(b.id)); + return JSON.stringify({ plugins, channels }); +} + +function setMergedSchemaCache(key: string, value: ConfigSchemaResponse): void { + if (mergedSchemaCache.size >= MERGED_SCHEMA_CACHE_MAX) { + const oldest = mergedSchemaCache.keys().next(); + if (!oldest.done) { + mergedSchemaCache.delete(oldest.value); + } + } + mergedSchemaCache.set(key, value); +} function stripChannelSchema(schema: ConfigSchema): ConfigSchema { const next = cloneSchema(schema); @@ -349,6 +386,11 @@ export function buildConfigSchema(params?: { if (plugins.length === 0 && channels.length === 0) { return base; } + const cacheKey = buildMergedSchemaCacheKey({ plugins, channels }); + const cached = mergedSchemaCache.get(cacheKey); + if (cached) { + return cached; + } const mergedWithoutSensitiveHints = applyHeartbeatTargetHints( applyChannelHints(applyPluginHints(base.uiHints, plugins), channels), channels, @@ -362,9 +404,11 @@ export function buildConfigSchema(params?: { applySensitiveHints(mergedWithoutSensitiveHints, extensionHintKeys), ); const mergedSchema = applyChannelSchemas(applyPluginSchemas(base.schema, plugins), channels); - return { + const merged = { ...base, schema: mergedSchema, uiHints: mergedHints, }; + setMergedSchemaCache(cacheKey, merged); + return merged; } diff --git a/src/config/sessions.cache.test.ts b/src/config/sessions.cache.test.ts index a77b1fdc2..ae3f81d64 100644 --- a/src/config/sessions.cache.test.ts +++ b/src/config/sessions.cache.test.ts @@ -69,21 +69,21 @@ describe("Session Store Cache", () => { expect(loaded).toEqual(testStore); }); - it("should cache session store on first load when file is unchanged", async () => { + it("should serve freshly saved session stores from cache without disk reads", async () => { const testStore = createSingleSessionStore(); await saveSessionStore(storePath, testStore); const readSpy = vi.spyOn(fs, "readFileSync"); - // First load - from disk + // First load - served from write-through cache const loaded1 = loadSessionStore(storePath); expect(loaded1).toEqual(testStore); - // Second load - should return cached data (no extra disk read) + // Second load - should stay cached (still no disk read) const loaded2 = loadSessionStore(storePath); expect(loaded2).toEqual(testStore); - expect(readSpy).toHaveBeenCalledTimes(1); + expect(readSpy).toHaveBeenCalledTimes(0); readSpy.mockRestore(); }); diff --git a/src/config/sessions/sessions.test.ts b/src/config/sessions/sessions.test.ts index 4630bca0f..dfe4b74e9 100644 --- a/src/config/sessions/sessions.test.ts +++ b/src/config/sessions/sessions.test.ts @@ -2,7 +2,8 @@ import fs from "node:fs"; import fsPromises from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from "vitest"; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import * as jsonFiles from "../../infra/json-files.js"; import { clearSessionStoreCacheForTest, loadSessionStore, @@ -200,6 +201,24 @@ describe("session store lock (Promise chain mutex)", () => { expect((store[key] as Record).counter).toBe(N); }); + it("skips session store disk writes when payload is unchanged", async () => { + const key = "agent:main:no-op-save"; + const { storePath } = await makeTmpStore({ + [key]: { sessionId: "s-noop", updatedAt: Date.now() }, + }); + + const writeSpy = vi.spyOn(jsonFiles, "writeTextAtomic"); + await updateSessionStore( + storePath, + async () => { + // Intentionally no-op mutation. + }, + { skipMaintenance: true }, + ); + expect(writeSpy).not.toHaveBeenCalled(); + writeSpy.mockRestore(); + }); + it("multiple consecutive errors do not permanently poison the queue", async () => { const key = "agent:main:multi-err"; const { storePath } = await makeTmpStore({ diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index bd5a3ebb0..473f9a69d 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -39,9 +39,11 @@ type SessionStoreCacheEntry = { loadedAt: number; storePath: string; mtimeMs?: number; + serialized?: string; }; const SESSION_STORE_CACHE = new Map(); +const SESSION_STORE_SERIALIZED_CACHE = new Map(); const DEFAULT_SESSION_STORE_TTL_MS = 45_000; // 45 seconds (between 30-60s) function isSessionStoreRecord(value: unknown): value is Record { @@ -67,6 +69,7 @@ function isSessionStoreCacheValid(entry: SessionStoreCacheEntry): boolean { function invalidateSessionStoreCache(storePath: string): void { SESSION_STORE_CACHE.delete(storePath); + SESSION_STORE_SERIALIZED_CACHE.delete(storePath); } function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry { @@ -170,6 +173,7 @@ function normalizeSessionStore(store: Record): void { export function clearSessionStoreCacheForTest(): void { SESSION_STORE_CACHE.clear(); + SESSION_STORE_SERIALIZED_CACHE.clear(); for (const queue of LOCK_QUEUES.values()) { for (const task of queue.pending) { task.reject(new Error("session store queue cleared for test")); @@ -220,6 +224,7 @@ export function loadSessionStore( // writer to finish. let store: Record = {}; let mtimeMs = getFileMtimeMs(storePath); + let serializedFromDisk: string | undefined; const maxReadAttempts = process.platform === "win32" ? 3 : 1; const retryBuf = maxReadAttempts > 1 ? new Int32Array(new SharedArrayBuffer(4)) : undefined; for (let attempt = 0; attempt < maxReadAttempts; attempt++) { @@ -233,6 +238,7 @@ export function loadSessionStore( const parsed = JSON.parse(raw); if (isSessionStoreRecord(parsed)) { store = parsed; + serializedFromDisk = raw; } mtimeMs = getFileMtimeMs(storePath) ?? mtimeMs; break; @@ -245,6 +251,11 @@ export function loadSessionStore( // Final attempt failed; proceed with an empty store. } } + if (serializedFromDisk !== undefined) { + SESSION_STORE_SERIALIZED_CACHE.set(storePath, serializedFromDisk); + } else { + SESSION_STORE_SERIALIZED_CACHE.delete(storePath); + } // Best-effort migration: message provider → channel naming. for (const entry of Object.values(store)) { @@ -277,6 +288,7 @@ export function loadSessionStore( loadedAt: Date.now(), storePath, mtimeMs, + serialized: serializedFromDisk, }); } @@ -639,14 +651,31 @@ type SaveSessionStoreOptions = { maintenanceOverride?: Partial; }; +function updateSessionStoreWriteCaches(params: { + storePath: string; + store: Record; + serialized: string; +}): void { + const mtimeMs = getFileMtimeMs(params.storePath); + SESSION_STORE_SERIALIZED_CACHE.set(params.storePath, params.serialized); + if (!isSessionStoreCacheEnabled()) { + SESSION_STORE_CACHE.delete(params.storePath); + return; + } + SESSION_STORE_CACHE.set(params.storePath, { + store: structuredClone(params.store), + loadedAt: Date.now(), + storePath: params.storePath, + mtimeMs, + serialized: params.serialized, + }); +} + async function saveSessionStoreUnlocked( storePath: string, store: Record, opts?: SaveSessionStoreOptions, ): Promise { - // Invalidate cache on write to ensure consistency - invalidateSessionStoreCache(storePath); - normalizeSessionStore(store); if (!opts?.skipMaintenance) { @@ -770,12 +799,17 @@ async function saveSessionStoreUnlocked( await fs.promises.mkdir(path.dirname(storePath), { recursive: true }); const json = JSON.stringify(store, null, 2); + if (SESSION_STORE_SERIALIZED_CACHE.get(storePath) === json) { + updateSessionStoreWriteCaches({ storePath, store, serialized: json }); + return; + } // Windows: keep retry semantics because rename can fail while readers hold locks. if (process.platform === "win32") { for (let i = 0; i < 5; i++) { try { await writeTextAtomic(storePath, json, { mode: 0o600 }); + updateSessionStoreWriteCaches({ storePath, store, serialized: json }); return; } catch (err) { const code = @@ -799,6 +833,7 @@ async function saveSessionStoreUnlocked( try { await writeTextAtomic(storePath, json, { mode: 0o600 }); + updateSessionStoreWriteCaches({ storePath, store, serialized: json }); } catch (err) { const code = err && typeof err === "object" && "code" in err @@ -810,6 +845,7 @@ async function saveSessionStoreUnlocked( // Best-effort: try a direct write (recreating the parent dir), otherwise ignore. try { await writeTextAtomic(storePath, json, { mode: 0o600 }); + updateSessionStoreWriteCaches({ storePath, store, serialized: json }); } catch (err2) { const code2 = err2 && typeof err2 === "object" && "code" in err2 diff --git a/src/memory/manager-sync-ops.ts b/src/memory/manager-sync-ops.ts index e6189f8d2..ac7852eaa 100644 --- a/src/memory/manager-sync-ops.ts +++ b/src/memory/manager-sync-ops.ts @@ -133,6 +133,7 @@ export abstract class MemoryManagerSyncOps { string, { lastSize: number; pendingBytes: number; pendingMessages: number } >(); + private lastMetaSerialized: string | null = null; protected abstract readonly cache: { enabled: boolean; maxEntries?: number }; protected abstract db: DatabaseSync; @@ -1166,22 +1167,30 @@ export abstract class MemoryManagerSyncOps { | { value: string } | undefined; if (!row?.value) { + this.lastMetaSerialized = null; return null; } try { - return JSON.parse(row.value) as MemoryIndexMeta; + const parsed = JSON.parse(row.value) as MemoryIndexMeta; + this.lastMetaSerialized = row.value; + return parsed; } catch { + this.lastMetaSerialized = null; return null; } } protected writeMeta(meta: MemoryIndexMeta) { const value = JSON.stringify(meta); + if (this.lastMetaSerialized === value) { + return; + } this.db .prepare( `INSERT INTO meta (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value`, ) .run(META_KEY, value); + this.lastMetaSerialized = value; } private resolveConfiguredSourcesForMeta(): MemorySource[] {