diff --git a/src/logging/diagnostic.test.ts b/src/logging/diagnostic.test.ts new file mode 100644 index 000000000..94ceec544 --- /dev/null +++ b/src/logging/diagnostic.test.ts @@ -0,0 +1,36 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + getDiagnosticSessionStateCountForTest, + logSessionStateChange, + resetDiagnosticStateForTest, +} from "./diagnostic.js"; + +describe("diagnostic session state pruning", () => { + beforeEach(() => { + vi.useFakeTimers(); + resetDiagnosticStateForTest(); + }); + + afterEach(() => { + resetDiagnosticStateForTest(); + vi.useRealTimers(); + }); + + it("evicts stale idle session states", () => { + logSessionStateChange({ sessionId: "stale-1", state: "idle" }); + expect(getDiagnosticSessionStateCountForTest()).toBe(1); + + vi.advanceTimersByTime(31 * 60 * 1000); + logSessionStateChange({ sessionId: "fresh-1", state: "idle" }); + + expect(getDiagnosticSessionStateCountForTest()).toBe(1); + }); + + it("caps tracked session states to a bounded max", () => { + for (let i = 0; i < 2105; i += 1) { + logSessionStateChange({ sessionId: `session-${i}`, state: "idle" }); + } + + expect(getDiagnosticSessionStateCountForTest()).toBe(2000); + }); +}); diff --git a/src/logging/diagnostic.ts b/src/logging/diagnostic.ts index 24dfc8961..abeb2b7e9 100644 --- a/src/logging/diagnostic.ts +++ b/src/logging/diagnostic.ts @@ -19,6 +19,9 @@ type SessionRef = { }; const sessionStates = new Map(); +const SESSION_STATE_TTL_MS = 30 * 60 * 1000; +const SESSION_STATE_PRUNE_INTERVAL_MS = 60 * 1000; +const SESSION_STATE_MAX_ENTRIES = 2000; const webhookStats = { received: 0, @@ -28,16 +31,49 @@ const webhookStats = { }; let lastActivityAt = 0; +let lastSessionPruneAt = 0; function markActivity() { lastActivityAt = Date.now(); } +function pruneSessionStates(now = Date.now(), force = false): void { + const shouldPruneForSize = sessionStates.size > SESSION_STATE_MAX_ENTRIES; + if (!force && !shouldPruneForSize && now - lastSessionPruneAt < SESSION_STATE_PRUNE_INTERVAL_MS) { + return; + } + lastSessionPruneAt = now; + + for (const [key, state] of sessionStates.entries()) { + const ageMs = now - state.lastActivity; + const isIdle = state.state === "idle"; + if (isIdle && state.queueDepth <= 0 && ageMs > SESSION_STATE_TTL_MS) { + sessionStates.delete(key); + } + } + + if (sessionStates.size <= SESSION_STATE_MAX_ENTRIES) { + return; + } + const excess = sessionStates.size - SESSION_STATE_MAX_ENTRIES; + const ordered = Array.from(sessionStates.entries()).toSorted( + (a, b) => a[1].lastActivity - b[1].lastActivity, + ); + for (let i = 0; i < excess; i += 1) { + const key = ordered[i]?.[0]; + if (!key) { + break; + } + sessionStates.delete(key); + } +} + function resolveSessionKey({ sessionKey, sessionId }: SessionRef) { return sessionKey ?? sessionId ?? "unknown"; } function getSessionState(ref: SessionRef): SessionState { + pruneSessionStates(); const key = resolveSessionKey(ref); const existing = sessionStates.get(key); if (existing) { @@ -57,6 +93,7 @@ function getSessionState(ref: SessionRef): SessionState { queueDepth: 0, }; sessionStates.set(key, created); + pruneSessionStates(Date.now(), true); return created; } @@ -303,6 +340,7 @@ export function startDiagnosticHeartbeat() { } heartbeatInterval = setInterval(() => { const now = Date.now(); + pruneSessionStates(now, true); const activeCount = Array.from(sessionStates.values()).filter( (s) => s.state === "processing", ).length; @@ -363,4 +401,19 @@ export function stopDiagnosticHeartbeat() { } } +export function getDiagnosticSessionStateCountForTest(): number { + return sessionStates.size; +} + +export function resetDiagnosticStateForTest(): void { + sessionStates.clear(); + webhookStats.received = 0; + webhookStats.processed = 0; + webhookStats.errors = 0; + webhookStats.lastReceived = 0; + lastActivityAt = 0; + lastSessionPruneAt = 0; + stopDiagnosticHeartbeat(); +} + export { diag as diagnosticLogger };