From fc8f59261a4cc730a2bbe64f6c2c9580dd71e586 Mon Sep 17 00:00:00 2001 From: Vignesh Natarajan Date: Sat, 14 Feb 2026 17:50:49 -0800 Subject: [PATCH] Gateway: bound agent run sequence tracking --- src/gateway/chat-abort.ts | 6 ++- src/gateway/server-chat.agent-events.test.ts | 46 ++++++++++++++++++++ src/gateway/server-chat.ts | 2 + src/gateway/server-maintenance.ts | 13 ++++++ src/gateway/server-methods/chat.ts | 2 + 5 files changed, 68 insertions(+), 1 deletion(-) diff --git a/src/gateway/chat-abort.ts b/src/gateway/chat-abort.ts index d1dd2ec87..12c47f5b1 100644 --- a/src/gateway/chat-abort.ts +++ b/src/gateway/chat-abort.ts @@ -88,8 +88,12 @@ export function abortChatRunById( ops.chatAbortControllers.delete(runId); ops.chatRunBuffers.delete(runId); ops.chatDeltaSentAt.delete(runId); - ops.removeChatRun(runId, runId, sessionKey); + const removed = ops.removeChatRun(runId, runId, sessionKey); broadcastChatAborted(ops, { runId, sessionKey, stopReason }); + ops.agentRunSeq.delete(runId); + if (removed?.clientRunId) { + ops.agentRunSeq.delete(removed.clientRunId); + } return { aborted: true }; } diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index a4a4c508e..9978416a4 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -131,6 +131,52 @@ describe("agent event handler", () => { nowSpy.mockRestore(); }); + it("cleans up agent run sequence tracking when lifecycle completes", () => { + const nowSpy = vi.spyOn(Date, "now").mockReturnValue(2_500); + const broadcast = vi.fn(); + const broadcastToConnIds = vi.fn(); + const nodeSendToSession = vi.fn(); + const agentRunSeq = new Map(); + const chatRunState = createChatRunState(); + const toolEventRecipients = createToolEventRecipientRegistry(); + chatRunState.registry.add("run-cleanup", { + sessionKey: "session-cleanup", + clientRunId: "client-cleanup", + }); + + const handler = createAgentEventHandler({ + broadcast, + broadcastToConnIds, + nodeSendToSession, + agentRunSeq, + chatRunState, + resolveSessionKeyForRun: () => undefined, + clearAgentRunContext: vi.fn(), + toolEventRecipients, + }); + + handler({ + runId: "run-cleanup", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "done" }, + }); + expect(agentRunSeq.get("run-cleanup")).toBe(1); + + handler({ + runId: "run-cleanup", + seq: 2, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "end" }, + }); + + expect(agentRunSeq.has("run-cleanup")).toBe(false); + expect(agentRunSeq.has("client-cleanup")).toBe(false); + nowSpy.mockRestore(); + }); + it("routes tool events only to registered recipients when verbose is enabled", () => { const broadcast = vi.fn(); const broadcastToConnIds = vi.fn(); diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index a200f0731..eff745595 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -419,6 +419,8 @@ export function createAgentEventHandler({ if (lifecyclePhase === "end" || lifecyclePhase === "error") { toolEventRecipients.markFinal(evt.runId); clearAgentRunContext(evt.runId); + agentRunSeq.delete(evt.runId); + agentRunSeq.delete(clientRunId); } }; } diff --git a/src/gateway/server-maintenance.ts b/src/gateway/server-maintenance.ts index 898e8ef74..bb3b68dd4 100644 --- a/src/gateway/server-maintenance.ts +++ b/src/gateway/server-maintenance.ts @@ -73,6 +73,7 @@ export function startGatewayMaintenanceTimers(params: { // dedupe cache cleanup const dedupeCleanup = setInterval(() => { + const AGENT_RUN_SEQ_MAX = 10_000; const now = Date.now(); for (const [k, v] of params.dedupe) { if (now - v.ts > DEDUPE_TTL_MS) { @@ -86,6 +87,18 @@ export function startGatewayMaintenanceTimers(params: { } } + if (params.agentRunSeq.size > AGENT_RUN_SEQ_MAX) { + const excess = params.agentRunSeq.size - AGENT_RUN_SEQ_MAX; + let removed = 0; + for (const runId of params.agentRunSeq.keys()) { + params.agentRunSeq.delete(runId); + removed += 1; + if (removed >= excess) { + break; + } + } + } + for (const [runId, entry] of params.chatAbortControllers) { if (now <= entry.expiresAtMs) { continue; diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 4966e1575..1049f37e2 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -189,6 +189,7 @@ function broadcastChatFinal(params: { }; params.context.broadcast("chat", payload); params.context.nodeSendToSession(params.sessionKey, "chat", payload); + params.context.agentRunSeq.delete(params.runId); } function broadcastChatError(params: { @@ -207,6 +208,7 @@ function broadcastChatError(params: { }; params.context.broadcast("chat", payload); params.context.nodeSendToSession(params.sessionKey, "chat", payload); + params.context.agentRunSeq.delete(params.runId); } export const chatHandlers: GatewayRequestHandlers = {