Gateway: bound agent run sequence tracking

This commit is contained in:
Vignesh Natarajan
2026-02-14 17:50:49 -08:00
parent 451deb066f
commit fc8f59261a
5 changed files with 68 additions and 1 deletions

View File

@@ -88,8 +88,12 @@ export function abortChatRunById(
ops.chatAbortControllers.delete(runId);
ops.chatRunBuffers.delete(runId);
ops.chatDeltaSentAt.delete(runId);
ops.removeChatRun(runId, runId, sessionKey);
const removed = ops.removeChatRun(runId, runId, sessionKey);
broadcastChatAborted(ops, { runId, sessionKey, stopReason });
ops.agentRunSeq.delete(runId);
if (removed?.clientRunId) {
ops.agentRunSeq.delete(removed.clientRunId);
}
return { aborted: true };
}

View File

@@ -131,6 +131,52 @@ describe("agent event handler", () => {
nowSpy.mockRestore();
});
it("cleans up agent run sequence tracking when lifecycle completes", () => {
const nowSpy = vi.spyOn(Date, "now").mockReturnValue(2_500);
const broadcast = vi.fn();
const broadcastToConnIds = vi.fn();
const nodeSendToSession = vi.fn();
const agentRunSeq = new Map<string, number>();
const chatRunState = createChatRunState();
const toolEventRecipients = createToolEventRecipientRegistry();
chatRunState.registry.add("run-cleanup", {
sessionKey: "session-cleanup",
clientRunId: "client-cleanup",
});
const handler = createAgentEventHandler({
broadcast,
broadcastToConnIds,
nodeSendToSession,
agentRunSeq,
chatRunState,
resolveSessionKeyForRun: () => undefined,
clearAgentRunContext: vi.fn(),
toolEventRecipients,
});
handler({
runId: "run-cleanup",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "done" },
});
expect(agentRunSeq.get("run-cleanup")).toBe(1);
handler({
runId: "run-cleanup",
seq: 2,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "end" },
});
expect(agentRunSeq.has("run-cleanup")).toBe(false);
expect(agentRunSeq.has("client-cleanup")).toBe(false);
nowSpy.mockRestore();
});
it("routes tool events only to registered recipients when verbose is enabled", () => {
const broadcast = vi.fn();
const broadcastToConnIds = vi.fn();

View File

@@ -419,6 +419,8 @@ export function createAgentEventHandler({
if (lifecyclePhase === "end" || lifecyclePhase === "error") {
toolEventRecipients.markFinal(evt.runId);
clearAgentRunContext(evt.runId);
agentRunSeq.delete(evt.runId);
agentRunSeq.delete(clientRunId);
}
};
}

View File

@@ -73,6 +73,7 @@ export function startGatewayMaintenanceTimers(params: {
// dedupe cache cleanup
const dedupeCleanup = setInterval(() => {
const AGENT_RUN_SEQ_MAX = 10_000;
const now = Date.now();
for (const [k, v] of params.dedupe) {
if (now - v.ts > DEDUPE_TTL_MS) {
@@ -86,6 +87,18 @@ export function startGatewayMaintenanceTimers(params: {
}
}
if (params.agentRunSeq.size > AGENT_RUN_SEQ_MAX) {
const excess = params.agentRunSeq.size - AGENT_RUN_SEQ_MAX;
let removed = 0;
for (const runId of params.agentRunSeq.keys()) {
params.agentRunSeq.delete(runId);
removed += 1;
if (removed >= excess) {
break;
}
}
}
for (const [runId, entry] of params.chatAbortControllers) {
if (now <= entry.expiresAtMs) {
continue;

View File

@@ -189,6 +189,7 @@ function broadcastChatFinal(params: {
};
params.context.broadcast("chat", payload);
params.context.nodeSendToSession(params.sessionKey, "chat", payload);
params.context.agentRunSeq.delete(params.runId);
}
function broadcastChatError(params: {
@@ -207,6 +208,7 @@ function broadcastChatError(params: {
};
params.context.broadcast("chat", payload);
params.context.nodeSendToSession(params.sessionKey, "chat", payload);
params.context.agentRunSeq.delete(params.runId);
}
export const chatHandlers: GatewayRequestHandlers = {