From 07be14c02de6abc4e0fb8a51991ee8ca5dbbadc5 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 16 Feb 2026 01:39:39 +0000 Subject: [PATCH] refactor(gateway): dedupe chat session abort flow --- src/gateway/server-methods/chat.ts | 95 +++++++++++++++--------------- 1 file changed, 49 insertions(+), 46 deletions(-) diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 7d6c74a15..55b0d97dc 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -16,6 +16,7 @@ import { abortChatRunById, abortChatRunsForSessionKey, type ChatAbortControllerEntry, + type ChatAbortOps, isChatStopCommandText, resolveChatRunExpiresAtMs, } from "../chat-abort.js"; @@ -293,6 +294,46 @@ function persistAbortedPartials(params: { } } +function createChatAbortOps(context: GatewayRequestContext): ChatAbortOps { + return { + chatAbortControllers: context.chatAbortControllers, + chatRunBuffers: context.chatRunBuffers, + chatDeltaSentAt: context.chatDeltaSentAt, + chatAbortedRuns: context.chatAbortedRuns, + removeChatRun: context.removeChatRun, + agentRunSeq: context.agentRunSeq, + broadcast: context.broadcast, + nodeSendToSession: context.nodeSendToSession, + }; +} + +function abortChatRunsForSessionKeyWithPartials(params: { + context: GatewayRequestContext; + ops: ChatAbortOps; + sessionKey: string; + abortOrigin: AbortOrigin; + stopReason?: string; +}) { + const snapshots = collectSessionAbortPartials({ + chatAbortControllers: params.context.chatAbortControllers, + chatRunBuffers: params.context.chatRunBuffers, + sessionKey: params.sessionKey, + abortOrigin: params.abortOrigin, + }); + const res = abortChatRunsForSessionKey(params.ops, { + sessionKey: params.sessionKey, + stopReason: params.stopReason, + }); + if (res.aborted) { + persistAbortedPartials({ + context: params.context, + sessionKey: params.sessionKey, + snapshots, + }); + } + return res; +} + function nextChatSeq(context: { agentRunSeq: Map }, runId: string) { const next = (context.agentRunSeq.get(runId) ?? 0) + 1; context.agentRunSeq.set(runId, next); @@ -408,35 +449,16 @@ export const chatHandlers: GatewayRequestHandlers = { runId?: string; }; - const ops = { - chatAbortControllers: context.chatAbortControllers, - chatRunBuffers: context.chatRunBuffers, - chatDeltaSentAt: context.chatDeltaSentAt, - chatAbortedRuns: context.chatAbortedRuns, - removeChatRun: context.removeChatRun, - agentRunSeq: context.agentRunSeq, - broadcast: context.broadcast, - nodeSendToSession: context.nodeSendToSession, - }; + const ops = createChatAbortOps(context); if (!runId) { - const snapshots = collectSessionAbortPartials({ - chatAbortControllers: context.chatAbortControllers, - chatRunBuffers: context.chatRunBuffers, + const res = abortChatRunsForSessionKeyWithPartials({ + context, + ops, sessionKey: rawSessionKey, abortOrigin: "rpc", - }); - const res = abortChatRunsForSessionKey(ops, { - sessionKey: rawSessionKey, stopReason: "rpc", }); - if (res.aborted) { - persistAbortedPartials({ - context, - sessionKey: rawSessionKey, - snapshots, - }); - } respond(true, { ok: true, aborted: res.aborted, runIds: res.runIds }); return; } @@ -569,32 +591,13 @@ export const chatHandlers: GatewayRequestHandlers = { } if (stopCommand) { - const snapshots = collectSessionAbortPartials({ - chatAbortControllers: context.chatAbortControllers, - chatRunBuffers: context.chatRunBuffers, + const res = abortChatRunsForSessionKeyWithPartials({ + context, + ops: createChatAbortOps(context), sessionKey: rawSessionKey, abortOrigin: "stop-command", + stopReason: "stop", }); - const res = abortChatRunsForSessionKey( - { - chatAbortControllers: context.chatAbortControllers, - chatRunBuffers: context.chatRunBuffers, - chatDeltaSentAt: context.chatDeltaSentAt, - chatAbortedRuns: context.chatAbortedRuns, - removeChatRun: context.removeChatRun, - agentRunSeq: context.agentRunSeq, - broadcast: context.broadcast, - nodeSendToSession: context.nodeSendToSession, - }, - { sessionKey: rawSessionKey, stopReason: "stop" }, - ); - if (res.aborted) { - persistAbortedPartials({ - context, - sessionKey: rawSessionKey, - snapshots, - }); - } respond(true, { ok: true, aborted: res.aborted, runIds: res.runIds }); return; }