diff --git a/src/logging/diagnostic-session-state.ts b/src/logging/diagnostic-session-state.ts new file mode 100644 index 000000000..c060866ff --- /dev/null +++ b/src/logging/diagnostic-session-state.ts @@ -0,0 +1,91 @@ +export type SessionStateValue = "idle" | "processing" | "waiting"; + +export type SessionState = { + sessionId?: string; + sessionKey?: string; + lastActivity: number; + state: SessionStateValue; + queueDepth: number; +}; + +export type SessionRef = { + sessionId?: string; + sessionKey?: string; +}; + +export const diagnosticSessionStates = new Map(); + +const SESSION_STATE_TTL_MS = 30 * 60 * 1000; +const SESSION_STATE_PRUNE_INTERVAL_MS = 60 * 1000; +const SESSION_STATE_MAX_ENTRIES = 2000; + +let lastSessionPruneAt = 0; + +export function pruneDiagnosticSessionStates(now = Date.now(), force = false): void { + const shouldPruneForSize = diagnosticSessionStates.size > SESSION_STATE_MAX_ENTRIES; + if (!force && !shouldPruneForSize && now - lastSessionPruneAt < SESSION_STATE_PRUNE_INTERVAL_MS) { + return; + } + lastSessionPruneAt = now; + + for (const [key, state] of diagnosticSessionStates.entries()) { + const ageMs = now - state.lastActivity; + const isIdle = state.state === "idle"; + if (isIdle && state.queueDepth <= 0 && ageMs > SESSION_STATE_TTL_MS) { + diagnosticSessionStates.delete(key); + } + } + + if (diagnosticSessionStates.size <= SESSION_STATE_MAX_ENTRIES) { + return; + } + const excess = diagnosticSessionStates.size - SESSION_STATE_MAX_ENTRIES; + const ordered = Array.from(diagnosticSessionStates.entries()).toSorted( + (a, b) => a[1].lastActivity - b[1].lastActivity, + ); + for (let i = 0; i < excess; i += 1) { + const key = ordered[i]?.[0]; + if (!key) { + break; + } + diagnosticSessionStates.delete(key); + } +} + +function resolveSessionKey({ sessionKey, sessionId }: SessionRef) { + return sessionKey ?? sessionId ?? "unknown"; +} + +export function getDiagnosticSessionState(ref: SessionRef): SessionState { + pruneDiagnosticSessionStates(); + const key = resolveSessionKey(ref); + const existing = diagnosticSessionStates.get(key); + if (existing) { + if (ref.sessionId) { + existing.sessionId = ref.sessionId; + } + if (ref.sessionKey) { + existing.sessionKey = ref.sessionKey; + } + return existing; + } + const created: SessionState = { + sessionId: ref.sessionId, + sessionKey: ref.sessionKey, + lastActivity: Date.now(), + state: "idle", + queueDepth: 0, + }; + diagnosticSessionStates.set(key, created); + pruneDiagnosticSessionStates(Date.now(), true); + return created; +} + +export function getDiagnosticSessionStateCountForTest(): number { + return diagnosticSessionStates.size; +} + +export function resetDiagnosticSessionStateForTest(): void { + diagnosticSessionStates.clear(); + lastSessionPruneAt = 0; +} diff --git a/src/logging/diagnostic.test.ts b/src/logging/diagnostic.test.ts index 75f6d942e..c7cd72f5c 100644 --- a/src/logging/diagnostic.test.ts +++ b/src/logging/diagnostic.test.ts @@ -1,34 +1,34 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { getDiagnosticSessionStateCountForTest, - logSessionStateChange, - resetDiagnosticStateForTest, -} from "./diagnostic.js"; + getDiagnosticSessionState, + resetDiagnosticSessionStateForTest, +} from "./diagnostic-session-state.js"; describe("diagnostic session state pruning", () => { beforeEach(() => { vi.useFakeTimers(); - resetDiagnosticStateForTest(); + resetDiagnosticSessionStateForTest(); }); afterEach(() => { - resetDiagnosticStateForTest(); + resetDiagnosticSessionStateForTest(); vi.useRealTimers(); }); it("evicts stale idle session states", () => { - logSessionStateChange({ sessionId: "stale-1", state: "idle" }); + getDiagnosticSessionState({ sessionId: "stale-1" }); expect(getDiagnosticSessionStateCountForTest()).toBe(1); vi.advanceTimersByTime(31 * 60 * 1000); - logSessionStateChange({ sessionId: "fresh-1", state: "idle" }); + getDiagnosticSessionState({ sessionId: "fresh-1" }); expect(getDiagnosticSessionStateCountForTest()).toBe(1); }); it("caps tracked session states to a bounded max", () => { for (let i = 0; i < 2001; i += 1) { - logSessionStateChange({ sessionId: `session-${i}`, state: "idle" }); + getDiagnosticSessionState({ sessionId: `session-${i}` }); } expect(getDiagnosticSessionStateCountForTest()).toBe(2000); diff --git a/src/logging/diagnostic.ts b/src/logging/diagnostic.ts index e5e8de402..73a3ac893 100644 --- a/src/logging/diagnostic.ts +++ b/src/logging/diagnostic.ts @@ -1,28 +1,17 @@ import { emitDiagnosticEvent } from "../infra/diagnostic-events.js"; +import { + diagnosticSessionStates, + getDiagnosticSessionState, + getDiagnosticSessionStateCountForTest as getDiagnosticSessionStateCountForTestImpl, + pruneDiagnosticSessionStates, + resetDiagnosticSessionStateForTest, + type SessionRef, + type SessionStateValue, +} from "./diagnostic-session-state.js"; import { createSubsystemLogger } from "./subsystem.js"; const diag = createSubsystemLogger("diagnostic"); -type SessionStateValue = "idle" | "processing" | "waiting"; - -type SessionState = { - sessionId?: string; - sessionKey?: string; - lastActivity: number; - state: SessionStateValue; - queueDepth: number; -}; - -type SessionRef = { - sessionId?: string; - sessionKey?: string; -}; - -const sessionStates = new Map(); -const SESSION_STATE_TTL_MS = 30 * 60 * 1000; -const SESSION_STATE_PRUNE_INTERVAL_MS = 60 * 1000; -const SESSION_STATE_MAX_ENTRIES = 2000; - const webhookStats = { received: 0, processed: 0, @@ -31,72 +20,11 @@ const webhookStats = { }; let lastActivityAt = 0; -let lastSessionPruneAt = 0; function markActivity() { lastActivityAt = Date.now(); } -function pruneSessionStates(now = Date.now(), force = false): void { - const shouldPruneForSize = sessionStates.size > SESSION_STATE_MAX_ENTRIES; - if (!force && !shouldPruneForSize && now - lastSessionPruneAt < SESSION_STATE_PRUNE_INTERVAL_MS) { - return; - } - lastSessionPruneAt = now; - - for (const [key, state] of sessionStates.entries()) { - const ageMs = now - state.lastActivity; - const isIdle = state.state === "idle"; - if (isIdle && state.queueDepth <= 0 && ageMs > SESSION_STATE_TTL_MS) { - sessionStates.delete(key); - } - } - - if (sessionStates.size <= SESSION_STATE_MAX_ENTRIES) { - return; - } - const excess = sessionStates.size - SESSION_STATE_MAX_ENTRIES; - const ordered = Array.from(sessionStates.entries()).toSorted( - (a, b) => a[1].lastActivity - b[1].lastActivity, - ); - for (let i = 0; i < excess; i += 1) { - const key = ordered[i]?.[0]; - if (!key) { - break; - } - sessionStates.delete(key); - } -} - -function resolveSessionKey({ sessionKey, sessionId }: SessionRef) { - return sessionKey ?? sessionId ?? "unknown"; -} - -function getSessionState(ref: SessionRef): SessionState { - pruneSessionStates(); - const key = resolveSessionKey(ref); - const existing = sessionStates.get(key); - if (existing) { - if (ref.sessionId) { - existing.sessionId = ref.sessionId; - } - if (ref.sessionKey) { - existing.sessionKey = ref.sessionKey; - } - return existing; - } - const created: SessionState = { - sessionId: ref.sessionId, - sessionKey: ref.sessionKey, - lastActivity: Date.now(), - state: "idle", - queueDepth: 0, - }; - sessionStates.set(key, created); - pruneSessionStates(Date.now(), true); - return created; -} - export function logWebhookReceived(params: { channel: string; updateType?: string; @@ -174,7 +102,7 @@ export function logMessageQueued(params: { channel?: string; source: string; }) { - const state = getSessionState(params); + const state = getDiagnosticSessionState(params); state.queueDepth += 1; state.lastActivity = Date.now(); if (diag.isEnabled("debug")) { @@ -244,7 +172,7 @@ export function logSessionStateChange( reason?: string; }, ) { - const state = getSessionState(params); + const state = getDiagnosticSessionState(params); const isProbeSession = state.sessionId?.startsWith("probe-") ?? false; const prevState = state.state; state.state = params.state; @@ -274,7 +202,7 @@ export function logSessionStateChange( } export function logSessionStuck(params: SessionRef & { state: SessionStateValue; ageMs: number }) { - const state = getSessionState(params); + const state = getDiagnosticSessionState(params); diag.warn( `stuck session: sessionId=${state.sessionId ?? "unknown"} sessionKey=${ state.sessionKey ?? "unknown" @@ -329,7 +257,7 @@ export function logRunAttempt(params: SessionRef & { runId: string; attempt: num } export function logActiveRuns() { - const activeSessions = Array.from(sessionStates.entries()) + const activeSessions = Array.from(diagnosticSessionStates.entries()) .filter(([, s]) => s.state === "processing") .map( ([id, s]) => @@ -347,14 +275,14 @@ export function startDiagnosticHeartbeat() { } heartbeatInterval = setInterval(() => { const now = Date.now(); - pruneSessionStates(now, true); - const activeCount = Array.from(sessionStates.values()).filter( + pruneDiagnosticSessionStates(now, true); + const activeCount = Array.from(diagnosticSessionStates.values()).filter( (s) => s.state === "processing", ).length; - const waitingCount = Array.from(sessionStates.values()).filter( + const waitingCount = Array.from(diagnosticSessionStates.values()).filter( (s) => s.state === "waiting", ).length; - const totalQueued = Array.from(sessionStates.values()).reduce( + const totalQueued = Array.from(diagnosticSessionStates.values()).reduce( (sum, s) => sum + s.queueDepth, 0, ); @@ -386,7 +314,7 @@ export function startDiagnosticHeartbeat() { queued: totalQueued, }); - for (const [, state] of sessionStates) { + for (const [, state] of diagnosticSessionStates) { const ageMs = now - state.lastActivity; if (state.state === "processing" && ageMs > 120_000) { logSessionStuck({ @@ -409,17 +337,16 @@ export function stopDiagnosticHeartbeat() { } export function getDiagnosticSessionStateCountForTest(): number { - return sessionStates.size; + return getDiagnosticSessionStateCountForTestImpl(); } export function resetDiagnosticStateForTest(): void { - sessionStates.clear(); + resetDiagnosticSessionStateForTest(); webhookStats.received = 0; webhookStats.processed = 0; webhookStats.errors = 0; webhookStats.lastReceived = 0; lastActivityAt = 0; - lastSessionPruneAt = 0; stopDiagnosticHeartbeat(); }