From 3efb7521249289dfd75ab2c20ca432f07c443fec Mon Sep 17 00:00:00 2001 From: Vishal Doshi Date: Sun, 15 Feb 2026 04:12:33 +0530 Subject: [PATCH] fix(gateway): abort active runs during sessions.reset (#16576) Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 43da87f2dfd38133210f98422255705d09ae7922 Co-authored-by: Grynn <212880+Grynn@users.noreply.github.com> Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Reviewed-by: @gumadeiras --- CHANGELOG.md | 1 + src/gateway/server-methods/sessions.ts | 59 ++++--- ...ions.gateway-server-sessions-a.e2e.test.ts | 150 ++++++++++++++++++ 3 files changed, 189 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 686803c49..f2e4eacb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai - Agents: keep unresolved mutating tool failures visible until the same action retry succeeds, scope mutation-error surfacing to mutating calls (including `session_status` model changes), and dedupe duplicate failure warnings in outbound replies. (#16131) Thanks @Swader. - Agents: classify external timeout aborts during compaction the same as internal timeouts, preventing unnecessary auth-profile rotation and preserving compaction-timeout snapshot fallback behavior. (#9855) Thanks @mverrilli. - Sessions/Agents: harden transcript path resolution for mismatched agent context by preserving explicit store roots and adding safe absolute-path fallback to the correct agent sessions directory. (#16288) Thanks @robbyczgw-cla. +- Gateway/Sessions: abort active embedded runs and clear queued session work before `sessions.reset`, returning unavailable if the run does not stop in time. (#16576) Thanks @Grynn. - BlueBubbles: include sender identity in group chat envelopes and pass clean message text to the agent prompt, aligning with iMessage/Signal formatting. (#16210) Thanks @zerone0x. - WhatsApp: honor per-account `dmPolicy` overrides (account-level settings now take precedence over channel defaults for inbound DMs). (#10082) Thanks @mcaxtr. - Media: accept `MEDIA:`-prefixed paths (lenient whitespace) when loading outbound media to prevent `ENOENT` for tool-returned local media paths. (#13107) Thanks @mcaxtr. diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index eb6618989..f27c6b8e3 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -88,6 +88,33 @@ function archiveSessionTranscriptsForSession(params: { }); } +async function ensureSessionRuntimeCleanup(params: { + cfg: ReturnType; + key: string; + target: ReturnType; + sessionId?: string; +}) { + const queueKeys = new Set(params.target.storeKeys); + queueKeys.add(params.target.canonicalKey); + if (params.sessionId) { + queueKeys.add(params.sessionId); + } + clearSessionQueues([...queueKeys]); + stopSubagentsForRequester({ cfg: params.cfg, requesterSessionKey: params.target.canonicalKey }); + if (!params.sessionId) { + return undefined; + } + abortEmbeddedPiRun(params.sessionId); + const ended = await waitForEmbeddedPiRunEnd(params.sessionId, 15_000); + if (ended) { + return undefined; + } + return errorShape( + ErrorCodes.UNAVAILABLE, + `Session ${params.key} is still active; try again in a moment.`, + ); +} + export const sessionsHandlers: GatewayRequestHandlers = { "sessions.list": ({ params, respond }) => { if (!validateSessionsListParams(params)) { @@ -278,6 +305,13 @@ export const sessionsHandlers: GatewayRequestHandlers = { const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); + const { entry } = loadSessionEntry(key); + const sessionId = entry?.sessionId; + const cleanupError = await ensureSessionRuntimeCleanup({ cfg, key, target, sessionId }); + if (cleanupError) { + respond(false, undefined, cleanupError); + return; + } const storePath = target.storePath; let oldSessionId: string | undefined; let oldSessionFile: string | undefined; @@ -360,27 +394,10 @@ export const sessionsHandlers: GatewayRequestHandlers = { const { entry } = loadSessionEntry(key); const sessionId = entry?.sessionId; const existed = Boolean(entry); - const queueKeys = new Set(target.storeKeys); - queueKeys.add(target.canonicalKey); - if (sessionId) { - queueKeys.add(sessionId); - } - clearSessionQueues([...queueKeys]); - stopSubagentsForRequester({ cfg, requesterSessionKey: target.canonicalKey }); - if (sessionId) { - abortEmbeddedPiRun(sessionId); - const ended = await waitForEmbeddedPiRunEnd(sessionId, 15_000); - if (!ended) { - respond( - false, - undefined, - errorShape( - ErrorCodes.UNAVAILABLE, - `Session ${key} is still active; try again in a moment.`, - ), - ); - return; - } + const cleanupError = await ensureSessionRuntimeCleanup({ cfg, key, target, sessionId }); + if (cleanupError) { + respond(false, undefined, cleanupError); + return; } await updateSessionStore(storePath, (store) => { const { primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store }); diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.e2e.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.e2e.test.ts index 1eb83fcf7..d6f66ef31 100644 --- a/src/gateway/server.sessions.gateway-server-sessions-a.e2e.test.ts +++ b/src/gateway/server.sessions.gateway-server-sessions-a.e2e.test.ts @@ -597,4 +597,154 @@ describe("gateway server sessions", () => { ws.close(); }); + + test("sessions.reset aborts active runs and clears queues", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-")); + const storePath = path.join(dir, "sessions.json"); + testState.sessionStorePath = storePath; + + await fs.writeFile( + path.join(dir, "sess-main.jsonl"), + `${JSON.stringify({ role: "user", content: "hello" })}\n`, + "utf-8", + ); + + await writeSessionStore({ + entries: { + main: { sessionId: "sess-main", updatedAt: Date.now() }, + }, + }); + + embeddedRunMock.activeIds.add("sess-main"); + embeddedRunMock.waitResults.set("sess-main", true); + + const { ws } = await openClient(); + + const reset = await rpcReq<{ ok: true; key: string; entry: { sessionId: string } }>( + ws, + "sessions.reset", + { + key: "main", + }, + ); + expect(reset.ok).toBe(true); + expect(reset.payload?.key).toBe("agent:main:main"); + expect(reset.payload?.entry.sessionId).not.toBe("sess-main"); + expect(sessionCleanupMocks.stopSubagentsForRequester).toHaveBeenCalledWith({ + cfg: expect.any(Object), + requesterSessionKey: "agent:main:main", + }); + expect(sessionCleanupMocks.clearSessionQueues).toHaveBeenCalledTimes(1); + const clearedKeys = sessionCleanupMocks.clearSessionQueues.mock.calls[0]?.[0] as string[]; + expect(clearedKeys).toEqual(expect.arrayContaining(["main", "agent:main:main", "sess-main"])); + expect(embeddedRunMock.abortCalls).toEqual(["sess-main"]); + expect(embeddedRunMock.waitCalls).toEqual(["sess-main"]); + + ws.close(); + }); + + test("sessions.reset returns unavailable when active run does not stop", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-")); + const storePath = path.join(dir, "sessions.json"); + testState.sessionStorePath = storePath; + + await fs.writeFile( + path.join(dir, "sess-main.jsonl"), + `${JSON.stringify({ role: "user", content: "hello" })}\n`, + "utf-8", + ); + + await writeSessionStore({ + entries: { + main: { sessionId: "sess-main", updatedAt: Date.now() }, + }, + }); + + embeddedRunMock.activeIds.add("sess-main"); + embeddedRunMock.waitResults.set("sess-main", false); + + const { ws } = await openClient(); + + const reset = await rpcReq(ws, "sessions.reset", { + key: "main", + }); + expect(reset.ok).toBe(false); + expect(reset.error?.code).toBe("UNAVAILABLE"); + expect(reset.error?.message ?? "").toMatch(/still active/i); + expect(sessionCleanupMocks.stopSubagentsForRequester).toHaveBeenCalledWith({ + cfg: expect.any(Object), + requesterSessionKey: "agent:main:main", + }); + expect(sessionCleanupMocks.clearSessionQueues).toHaveBeenCalledTimes(1); + const clearedKeys = sessionCleanupMocks.clearSessionQueues.mock.calls[0]?.[0] as string[]; + expect(clearedKeys).toEqual(expect.arrayContaining(["main", "agent:main:main", "sess-main"])); + expect(embeddedRunMock.abortCalls).toEqual(["sess-main"]); + expect(embeddedRunMock.waitCalls).toEqual(["sess-main"]); + + const store = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + { sessionId?: string } + >; + expect(store["agent:main:main"]?.sessionId).toBe("sess-main"); + const filesAfterResetAttempt = await fs.readdir(dir); + expect(filesAfterResetAttempt.some((f) => f.startsWith("sess-main.jsonl.reset."))).toBe(false); + + ws.close(); + }); + + test("sessions.delete returns unavailable when active run does not stop", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-")); + const storePath = path.join(dir, "sessions.json"); + testState.sessionStorePath = storePath; + + await fs.writeFile( + path.join(dir, "sess-active.jsonl"), + `${JSON.stringify({ role: "user", content: "active" })}\n`, + "utf-8", + ); + + await writeSessionStore({ + entries: { + "discord:group:dev": { + sessionId: "sess-active", + updatedAt: Date.now(), + }, + }, + }); + + embeddedRunMock.activeIds.add("sess-active"); + embeddedRunMock.waitResults.set("sess-active", false); + + const { ws } = await openClient(); + + const deleted = await rpcReq(ws, "sessions.delete", { + key: "discord:group:dev", + }); + expect(deleted.ok).toBe(false); + expect(deleted.error?.code).toBe("UNAVAILABLE"); + expect(deleted.error?.message ?? "").toMatch(/still active/i); + expect(sessionCleanupMocks.stopSubagentsForRequester).toHaveBeenCalledWith({ + cfg: expect.any(Object), + requesterSessionKey: "agent:main:discord:group:dev", + }); + expect(sessionCleanupMocks.clearSessionQueues).toHaveBeenCalledTimes(1); + const clearedKeys = sessionCleanupMocks.clearSessionQueues.mock.calls[0]?.[0] as string[]; + expect(clearedKeys).toEqual( + expect.arrayContaining(["discord:group:dev", "agent:main:discord:group:dev", "sess-active"]), + ); + expect(embeddedRunMock.abortCalls).toEqual(["sess-active"]); + expect(embeddedRunMock.waitCalls).toEqual(["sess-active"]); + + const store = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + { sessionId?: string } + >; + expect(store["agent:main:discord:group:dev"]?.sessionId).toBe("sess-active"); + const filesAfterDeleteAttempt = await fs.readdir(dir); + expect(filesAfterDeleteAttempt.some((f) => f.startsWith("sess-active.jsonl.deleted."))).toBe( + false, + ); + + ws.close(); + }); });