diff --git a/src/auto-reply/dispatch.test.ts b/src/auto-reply/dispatch.test.ts index b07f720ab..9e9630c40 100644 --- a/src/auto-reply/dispatch.test.ts +++ b/src/auto-reply/dispatch.test.ts @@ -1,6 +1,8 @@ import { describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../config/config.js"; import type { ReplyDispatcher } from "./reply/reply-dispatcher.js"; -import { withReplyDispatcher } from "./dispatch.js"; +import { dispatchInboundMessage, withReplyDispatcher } from "./dispatch.js"; +import { buildTestCtx } from "./reply/test-ctx.js"; function createDispatcher(record: string[]): ReplyDispatcher { return { @@ -58,4 +60,32 @@ describe("withReplyDispatcher", () => { expect(onSettled).toHaveBeenCalledTimes(1); expect(order).toEqual(["run", "markComplete", "waitForIdle", "onSettled"]); }); + + it("dispatchInboundMessage owns dispatcher lifecycle", async () => { + const order: string[] = []; + const dispatcher = { + sendToolResult: () => true, + sendBlockReply: () => true, + sendFinalReply: () => { + order.push("sendFinalReply"); + return true; + }, + getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }), + markComplete: () => { + order.push("markComplete"); + }, + waitForIdle: async () => { + order.push("waitForIdle"); + }, + } satisfies ReplyDispatcher; + + await dispatchInboundMessage({ + ctx: buildTestCtx(), + cfg: {} as OpenClawConfig, + dispatcher, + replyResolver: async () => ({ text: "ok" }), + }); + + expect(order).toEqual(["sendFinalReply", "markComplete", "waitForIdle"]); + }); }); diff --git a/src/auto-reply/dispatch.ts b/src/auto-reply/dispatch.ts index 32f89beb1..54bf79a7b 100644 --- a/src/auto-reply/dispatch.ts +++ b/src/auto-reply/dispatch.ts @@ -40,12 +40,16 @@ export async function dispatchInboundMessage(params: { replyResolver?: typeof import("./reply.js").getReplyFromConfig; }): Promise { const finalized = finalizeInboundContext(params.ctx); - return await dispatchReplyFromConfig({ - ctx: finalized, - cfg: params.cfg, + return await withReplyDispatcher({ dispatcher: params.dispatcher, - replyOptions: params.replyOptions, - replyResolver: params.replyResolver, + run: () => + dispatchReplyFromConfig({ + ctx: finalized, + cfg: params.cfg, + dispatcher: params.dispatcher, + replyOptions: params.replyOptions, + replyResolver: params.replyResolver, + }), }); } @@ -59,23 +63,20 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: { const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping( params.dispatcherOptions, ); - return await withReplyDispatcher({ - dispatcher, - run: async () => - dispatchInboundMessage({ - ctx: params.ctx, - cfg: params.cfg, - dispatcher, - replyResolver: params.replyResolver, - replyOptions: { - ...params.replyOptions, - ...replyOptions, - }, - }), - onSettled: () => { - markDispatchIdle(); - }, - }); + try { + return await dispatchInboundMessage({ + ctx: params.ctx, + cfg: params.cfg, + dispatcher, + replyResolver: params.replyResolver, + replyOptions: { + ...params.replyOptions, + ...replyOptions, + }, + }); + } finally { + markDispatchIdle(); + } } export async function dispatchInboundMessageWithDispatcher(params: { @@ -86,15 +87,11 @@ export async function dispatchInboundMessageWithDispatcher(params: { replyResolver?: typeof import("./reply.js").getReplyFromConfig; }): Promise { const dispatcher = createReplyDispatcher(params.dispatcherOptions); - return await withReplyDispatcher({ + return await dispatchInboundMessage({ + ctx: params.ctx, + cfg: params.cfg, dispatcher, - run: async () => - dispatchInboundMessage({ - ctx: params.ctx, - cfg: params.cfg, - dispatcher, - replyResolver: params.replyResolver, - replyOptions: params.replyOptions, - }), + replyResolver: params.replyResolver, + replyOptions: params.replyOptions, }); } diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 0f2cae6b4..45bd75040 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -278,7 +278,6 @@ export async function dispatchReplyFromConfig(params: { } else { queuedFinal = dispatcher.sendFinalReply(payload); } - await dispatcher.waitForIdle(); const counts = dispatcher.getQueuedCounts(); counts.final += routedFinalCount; recordProcessed("completed", { reason: "fast_abort" }); @@ -443,8 +442,6 @@ export async function dispatchReplyFromConfig(params: { } } - await dispatcher.waitForIdle(); - const counts = dispatcher.getQueuedCounts(); counts.final += routedFinalCount; recordProcessed("completed"); @@ -454,9 +451,5 @@ export async function dispatchReplyFromConfig(params: { recordProcessed("error", { error: String(err) }); markIdle("message_error"); throw err; - } finally { - // Always clear the dispatcher reservation so a leaked pending count - // can never permanently block gateway restarts. - dispatcher.markComplete(); } } diff --git a/src/discord/monitor/message-handler.process.test.ts b/src/discord/monitor/message-handler.process.test.ts index 619d120ca..5e26257f3 100644 --- a/src/discord/monitor/message-handler.process.test.ts +++ b/src/discord/monitor/message-handler.process.test.ts @@ -20,7 +20,14 @@ vi.mock("../../auto-reply/reply/dispatch-from-config.js", () => ({ vi.mock("../../auto-reply/reply/reply-dispatcher.js", () => ({ createReplyDispatcherWithTyping: vi.fn(() => ({ - dispatcher: {}, + dispatcher: { + sendToolResult: vi.fn(() => true), + sendBlockReply: vi.fn(() => true), + sendFinalReply: vi.fn(() => true), + waitForIdle: vi.fn(async () => {}), + getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), + markComplete: vi.fn(), + }, replyOptions: {}, markDispatchIdle: vi.fn(), })), diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index b099364cb..28ea99b60 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -6,7 +6,7 @@ import type { GatewayRequestContext, GatewayRequestHandlers } from "./types.js"; import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import { resolveThinkingDefault } from "../../agents/model-selection.js"; import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; -import { dispatchInboundMessage, withReplyDispatcher } from "../../auto-reply/dispatch.js"; +import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js"; import { createReplyPrefixOptions } from "../../channels/reply-prefix.js"; import { resolveSessionFilePath } from "../../config/sessions.js"; @@ -524,40 +524,36 @@ export const chatHandlers: GatewayRequestHandlers = { }); let agentRunStarted = false; - void withReplyDispatcher({ + void dispatchInboundMessage({ + ctx, + cfg, dispatcher, - run: () => - dispatchInboundMessage({ - ctx, - cfg, - dispatcher, - replyOptions: { - runId: clientRunId, - abortSignal: abortController.signal, - images: parsedImages.length > 0 ? parsedImages : undefined, - disableBlockStreaming: true, - onAgentRunStart: (runId) => { - agentRunStarted = true; - const connId = typeof client?.connId === "string" ? client.connId : undefined; - const wantsToolEvents = hasGatewayClientCap( - client?.connect?.caps, - GATEWAY_CLIENT_CAPS.TOOL_EVENTS, - ); - if (connId && wantsToolEvents) { - context.registerToolEventRecipient(runId, connId); - // Register for any other active runs *in the same session* so - // late-joining clients (e.g. page refresh mid-response) receive - // in-progress tool events without leaking cross-session data. - for (const [activeRunId, active] of context.chatAbortControllers) { - if (activeRunId !== runId && active.sessionKey === p.sessionKey) { - context.registerToolEventRecipient(activeRunId, connId); - } - } + replyOptions: { + runId: clientRunId, + abortSignal: abortController.signal, + images: parsedImages.length > 0 ? parsedImages : undefined, + disableBlockStreaming: true, + onAgentRunStart: (runId) => { + agentRunStarted = true; + const connId = typeof client?.connId === "string" ? client.connId : undefined; + const wantsToolEvents = hasGatewayClientCap( + client?.connect?.caps, + GATEWAY_CLIENT_CAPS.TOOL_EVENTS, + ); + if (connId && wantsToolEvents) { + context.registerToolEventRecipient(runId, connId); + // Register for any other active runs *in the same session* so + // late-joining clients (e.g. page refresh mid-response) receive + // in-progress tool events without leaking cross-session data. + for (const [activeRunId, active] of context.chatAbortControllers) { + if (activeRunId !== runId && active.sessionKey === p.sessionKey) { + context.registerToolEventRecipient(activeRunId, connId); } - }, - onModelSelected, - }, - }), + } + } + }, + onModelSelected, + }, }) .then(() => { if (!agentRunStarted) { diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts index 771003f2f..445fe73ae 100644 --- a/src/imessage/monitor/monitor-provider.ts +++ b/src/imessage/monitor/monitor-provider.ts @@ -3,7 +3,7 @@ import type { IMessagePayload, MonitorIMessageOpts } from "./types.js"; import { resolveHumanDelayConfig } from "../../agents/identity.js"; import { resolveTextChunkLimit } from "../../auto-reply/chunk.js"; import { hasControlCommand } from "../../auto-reply/command-detection.js"; -import { dispatchInboundMessage, withReplyDispatcher } from "../../auto-reply/dispatch.js"; +import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; import { formatInboundEnvelope, formatInboundFromLabel, @@ -647,21 +647,17 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P }, }); - const { queuedFinal } = await withReplyDispatcher({ + const { queuedFinal } = await dispatchInboundMessage({ + ctx: ctxPayload, + cfg, dispatcher, - run: () => - dispatchInboundMessage({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - disableBlockStreaming: - typeof accountInfo.config.blockStreaming === "boolean" - ? !accountInfo.config.blockStreaming - : undefined, - onModelSelected, - }, - }), + replyOptions: { + disableBlockStreaming: + typeof accountInfo.config.blockStreaming === "boolean" + ? !accountInfo.config.blockStreaming + : undefined, + onModelSelected, + }, }); if (!queuedFinal) {