perf(config): skip redundant schema and session-store work
This commit is contained in:
@@ -127,6 +127,31 @@ describe("config schema", () => {
|
||||
expect(listHint?.help).toContain("bluebubbles");
|
||||
});
|
||||
|
||||
it("caches merged schemas for identical plugin/channel metadata", () => {
|
||||
const params = {
|
||||
plugins: [
|
||||
{
|
||||
id: "voice-call",
|
||||
name: "Voice Call",
|
||||
configSchema: { type: "object", properties: { provider: { type: "string" } } },
|
||||
},
|
||||
],
|
||||
channels: [
|
||||
{
|
||||
id: "matrix",
|
||||
label: "Matrix",
|
||||
configSchema: { type: "object", properties: { accessToken: { type: "string" } } },
|
||||
},
|
||||
],
|
||||
};
|
||||
const first = buildConfigSchema(params);
|
||||
const second = buildConfigSchema({
|
||||
plugins: [{ ...params.plugins[0] }],
|
||||
channels: [{ ...params.channels[0] }],
|
||||
});
|
||||
expect(second).toBe(first);
|
||||
});
|
||||
|
||||
it("derives security/auth tags for credential paths", () => {
|
||||
const tags = deriveTagsForPath("gateway.auth.token");
|
||||
expect(tags).toContain("security");
|
||||
|
||||
@@ -297,6 +297,43 @@ function applyChannelSchemas(schema: ConfigSchema, channels: ChannelUiMetadata[]
|
||||
}
|
||||
|
||||
let cachedBase: ConfigSchemaResponse | null = null;
|
||||
const mergedSchemaCache = new Map<string, ConfigSchemaResponse>();
|
||||
const MERGED_SCHEMA_CACHE_MAX = 64;
|
||||
|
||||
function buildMergedSchemaCacheKey(params: {
|
||||
plugins: PluginUiMetadata[];
|
||||
channels: ChannelUiMetadata[];
|
||||
}): string {
|
||||
const plugins = params.plugins
|
||||
.map((plugin) => ({
|
||||
id: plugin.id,
|
||||
name: plugin.name,
|
||||
description: plugin.description,
|
||||
configSchema: plugin.configSchema ?? null,
|
||||
configUiHints: plugin.configUiHints ?? null,
|
||||
}))
|
||||
.toSorted((a, b) => a.id.localeCompare(b.id));
|
||||
const channels = params.channels
|
||||
.map((channel) => ({
|
||||
id: channel.id,
|
||||
label: channel.label,
|
||||
description: channel.description,
|
||||
configSchema: channel.configSchema ?? null,
|
||||
configUiHints: channel.configUiHints ?? null,
|
||||
}))
|
||||
.toSorted((a, b) => a.id.localeCompare(b.id));
|
||||
return JSON.stringify({ plugins, channels });
|
||||
}
|
||||
|
||||
function setMergedSchemaCache(key: string, value: ConfigSchemaResponse): void {
|
||||
if (mergedSchemaCache.size >= MERGED_SCHEMA_CACHE_MAX) {
|
||||
const oldest = mergedSchemaCache.keys().next();
|
||||
if (!oldest.done) {
|
||||
mergedSchemaCache.delete(oldest.value);
|
||||
}
|
||||
}
|
||||
mergedSchemaCache.set(key, value);
|
||||
}
|
||||
|
||||
function stripChannelSchema(schema: ConfigSchema): ConfigSchema {
|
||||
const next = cloneSchema(schema);
|
||||
@@ -349,6 +386,11 @@ export function buildConfigSchema(params?: {
|
||||
if (plugins.length === 0 && channels.length === 0) {
|
||||
return base;
|
||||
}
|
||||
const cacheKey = buildMergedSchemaCacheKey({ plugins, channels });
|
||||
const cached = mergedSchemaCache.get(cacheKey);
|
||||
if (cached) {
|
||||
return cached;
|
||||
}
|
||||
const mergedWithoutSensitiveHints = applyHeartbeatTargetHints(
|
||||
applyChannelHints(applyPluginHints(base.uiHints, plugins), channels),
|
||||
channels,
|
||||
@@ -362,9 +404,11 @@ export function buildConfigSchema(params?: {
|
||||
applySensitiveHints(mergedWithoutSensitiveHints, extensionHintKeys),
|
||||
);
|
||||
const mergedSchema = applyChannelSchemas(applyPluginSchemas(base.schema, plugins), channels);
|
||||
return {
|
||||
const merged = {
|
||||
...base,
|
||||
schema: mergedSchema,
|
||||
uiHints: mergedHints,
|
||||
};
|
||||
setMergedSchemaCache(cacheKey, merged);
|
||||
return merged;
|
||||
}
|
||||
|
||||
@@ -69,21 +69,21 @@ describe("Session Store Cache", () => {
|
||||
expect(loaded).toEqual(testStore);
|
||||
});
|
||||
|
||||
it("should cache session store on first load when file is unchanged", async () => {
|
||||
it("should serve freshly saved session stores from cache without disk reads", async () => {
|
||||
const testStore = createSingleSessionStore();
|
||||
|
||||
await saveSessionStore(storePath, testStore);
|
||||
|
||||
const readSpy = vi.spyOn(fs, "readFileSync");
|
||||
|
||||
// First load - from disk
|
||||
// First load - served from write-through cache
|
||||
const loaded1 = loadSessionStore(storePath);
|
||||
expect(loaded1).toEqual(testStore);
|
||||
|
||||
// Second load - should return cached data (no extra disk read)
|
||||
// Second load - should stay cached (still no disk read)
|
||||
const loaded2 = loadSessionStore(storePath);
|
||||
expect(loaded2).toEqual(testStore);
|
||||
expect(readSpy).toHaveBeenCalledTimes(1);
|
||||
expect(readSpy).toHaveBeenCalledTimes(0);
|
||||
readSpy.mockRestore();
|
||||
});
|
||||
|
||||
|
||||
@@ -2,7 +2,8 @@ import fs from "node:fs";
|
||||
import fsPromises from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from "vitest";
|
||||
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import * as jsonFiles from "../../infra/json-files.js";
|
||||
import {
|
||||
clearSessionStoreCacheForTest,
|
||||
loadSessionStore,
|
||||
@@ -200,6 +201,24 @@ describe("session store lock (Promise chain mutex)", () => {
|
||||
expect((store[key] as Record<string, unknown>).counter).toBe(N);
|
||||
});
|
||||
|
||||
it("skips session store disk writes when payload is unchanged", async () => {
|
||||
const key = "agent:main:no-op-save";
|
||||
const { storePath } = await makeTmpStore({
|
||||
[key]: { sessionId: "s-noop", updatedAt: Date.now() },
|
||||
});
|
||||
|
||||
const writeSpy = vi.spyOn(jsonFiles, "writeTextAtomic");
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
async () => {
|
||||
// Intentionally no-op mutation.
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
expect(writeSpy).not.toHaveBeenCalled();
|
||||
writeSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("multiple consecutive errors do not permanently poison the queue", async () => {
|
||||
const key = "agent:main:multi-err";
|
||||
const { storePath } = await makeTmpStore({
|
||||
|
||||
@@ -39,9 +39,11 @@ type SessionStoreCacheEntry = {
|
||||
loadedAt: number;
|
||||
storePath: string;
|
||||
mtimeMs?: number;
|
||||
serialized?: string;
|
||||
};
|
||||
|
||||
const SESSION_STORE_CACHE = new Map<string, SessionStoreCacheEntry>();
|
||||
const SESSION_STORE_SERIALIZED_CACHE = new Map<string, string>();
|
||||
const DEFAULT_SESSION_STORE_TTL_MS = 45_000; // 45 seconds (between 30-60s)
|
||||
|
||||
function isSessionStoreRecord(value: unknown): value is Record<string, SessionEntry> {
|
||||
@@ -67,6 +69,7 @@ function isSessionStoreCacheValid(entry: SessionStoreCacheEntry): boolean {
|
||||
|
||||
function invalidateSessionStoreCache(storePath: string): void {
|
||||
SESSION_STORE_CACHE.delete(storePath);
|
||||
SESSION_STORE_SERIALIZED_CACHE.delete(storePath);
|
||||
}
|
||||
|
||||
function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry {
|
||||
@@ -170,6 +173,7 @@ function normalizeSessionStore(store: Record<string, SessionEntry>): void {
|
||||
|
||||
export function clearSessionStoreCacheForTest(): void {
|
||||
SESSION_STORE_CACHE.clear();
|
||||
SESSION_STORE_SERIALIZED_CACHE.clear();
|
||||
for (const queue of LOCK_QUEUES.values()) {
|
||||
for (const task of queue.pending) {
|
||||
task.reject(new Error("session store queue cleared for test"));
|
||||
@@ -220,6 +224,7 @@ export function loadSessionStore(
|
||||
// writer to finish.
|
||||
let store: Record<string, SessionEntry> = {};
|
||||
let mtimeMs = getFileMtimeMs(storePath);
|
||||
let serializedFromDisk: string | undefined;
|
||||
const maxReadAttempts = process.platform === "win32" ? 3 : 1;
|
||||
const retryBuf = maxReadAttempts > 1 ? new Int32Array(new SharedArrayBuffer(4)) : undefined;
|
||||
for (let attempt = 0; attempt < maxReadAttempts; attempt++) {
|
||||
@@ -233,6 +238,7 @@ export function loadSessionStore(
|
||||
const parsed = JSON.parse(raw);
|
||||
if (isSessionStoreRecord(parsed)) {
|
||||
store = parsed;
|
||||
serializedFromDisk = raw;
|
||||
}
|
||||
mtimeMs = getFileMtimeMs(storePath) ?? mtimeMs;
|
||||
break;
|
||||
@@ -245,6 +251,11 @@ export function loadSessionStore(
|
||||
// Final attempt failed; proceed with an empty store.
|
||||
}
|
||||
}
|
||||
if (serializedFromDisk !== undefined) {
|
||||
SESSION_STORE_SERIALIZED_CACHE.set(storePath, serializedFromDisk);
|
||||
} else {
|
||||
SESSION_STORE_SERIALIZED_CACHE.delete(storePath);
|
||||
}
|
||||
|
||||
// Best-effort migration: message provider → channel naming.
|
||||
for (const entry of Object.values(store)) {
|
||||
@@ -277,6 +288,7 @@ export function loadSessionStore(
|
||||
loadedAt: Date.now(),
|
||||
storePath,
|
||||
mtimeMs,
|
||||
serialized: serializedFromDisk,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -639,14 +651,31 @@ type SaveSessionStoreOptions = {
|
||||
maintenanceOverride?: Partial<ResolvedSessionMaintenanceConfig>;
|
||||
};
|
||||
|
||||
function updateSessionStoreWriteCaches(params: {
|
||||
storePath: string;
|
||||
store: Record<string, SessionEntry>;
|
||||
serialized: string;
|
||||
}): void {
|
||||
const mtimeMs = getFileMtimeMs(params.storePath);
|
||||
SESSION_STORE_SERIALIZED_CACHE.set(params.storePath, params.serialized);
|
||||
if (!isSessionStoreCacheEnabled()) {
|
||||
SESSION_STORE_CACHE.delete(params.storePath);
|
||||
return;
|
||||
}
|
||||
SESSION_STORE_CACHE.set(params.storePath, {
|
||||
store: structuredClone(params.store),
|
||||
loadedAt: Date.now(),
|
||||
storePath: params.storePath,
|
||||
mtimeMs,
|
||||
serialized: params.serialized,
|
||||
});
|
||||
}
|
||||
|
||||
async function saveSessionStoreUnlocked(
|
||||
storePath: string,
|
||||
store: Record<string, SessionEntry>,
|
||||
opts?: SaveSessionStoreOptions,
|
||||
): Promise<void> {
|
||||
// Invalidate cache on write to ensure consistency
|
||||
invalidateSessionStoreCache(storePath);
|
||||
|
||||
normalizeSessionStore(store);
|
||||
|
||||
if (!opts?.skipMaintenance) {
|
||||
@@ -770,12 +799,17 @@ async function saveSessionStoreUnlocked(
|
||||
|
||||
await fs.promises.mkdir(path.dirname(storePath), { recursive: true });
|
||||
const json = JSON.stringify(store, null, 2);
|
||||
if (SESSION_STORE_SERIALIZED_CACHE.get(storePath) === json) {
|
||||
updateSessionStoreWriteCaches({ storePath, store, serialized: json });
|
||||
return;
|
||||
}
|
||||
|
||||
// Windows: keep retry semantics because rename can fail while readers hold locks.
|
||||
if (process.platform === "win32") {
|
||||
for (let i = 0; i < 5; i++) {
|
||||
try {
|
||||
await writeTextAtomic(storePath, json, { mode: 0o600 });
|
||||
updateSessionStoreWriteCaches({ storePath, store, serialized: json });
|
||||
return;
|
||||
} catch (err) {
|
||||
const code =
|
||||
@@ -799,6 +833,7 @@ async function saveSessionStoreUnlocked(
|
||||
|
||||
try {
|
||||
await writeTextAtomic(storePath, json, { mode: 0o600 });
|
||||
updateSessionStoreWriteCaches({ storePath, store, serialized: json });
|
||||
} catch (err) {
|
||||
const code =
|
||||
err && typeof err === "object" && "code" in err
|
||||
@@ -810,6 +845,7 @@ async function saveSessionStoreUnlocked(
|
||||
// Best-effort: try a direct write (recreating the parent dir), otherwise ignore.
|
||||
try {
|
||||
await writeTextAtomic(storePath, json, { mode: 0o600 });
|
||||
updateSessionStoreWriteCaches({ storePath, store, serialized: json });
|
||||
} catch (err2) {
|
||||
const code2 =
|
||||
err2 && typeof err2 === "object" && "code" in err2
|
||||
|
||||
@@ -133,6 +133,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
string,
|
||||
{ lastSize: number; pendingBytes: number; pendingMessages: number }
|
||||
>();
|
||||
private lastMetaSerialized: string | null = null;
|
||||
|
||||
protected abstract readonly cache: { enabled: boolean; maxEntries?: number };
|
||||
protected abstract db: DatabaseSync;
|
||||
@@ -1166,22 +1167,30 @@ export abstract class MemoryManagerSyncOps {
|
||||
| { value: string }
|
||||
| undefined;
|
||||
if (!row?.value) {
|
||||
this.lastMetaSerialized = null;
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return JSON.parse(row.value) as MemoryIndexMeta;
|
||||
const parsed = JSON.parse(row.value) as MemoryIndexMeta;
|
||||
this.lastMetaSerialized = row.value;
|
||||
return parsed;
|
||||
} catch {
|
||||
this.lastMetaSerialized = null;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
protected writeMeta(meta: MemoryIndexMeta) {
|
||||
const value = JSON.stringify(meta);
|
||||
if (this.lastMetaSerialized === value) {
|
||||
return;
|
||||
}
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT INTO meta (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value`,
|
||||
)
|
||||
.run(META_KEY, value);
|
||||
this.lastMetaSerialized = value;
|
||||
}
|
||||
|
||||
private resolveConfiguredSourcesForMeta(): MemorySource[] {
|
||||
|
||||
Reference in New Issue
Block a user