diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index 65321c4bb..475232103 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -443,6 +443,137 @@ describe("subagent announce formatting", () => { expect(new Set(idempotencyKeys).size).toBe(2); }); + it("prefers direct delivery first for completion-mode and then queues on direct failure", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); + sessionStore = { + "agent:main:main": { + sessionId: "session-collect", + lastChannel: "whatsapp", + lastTo: "+1555", + queueMode: "collect", + queueDebounceMs: 0, + }, + }; + agentSpy.mockRejectedValueOnce(new Error("direct delivery unavailable")); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:worker", + childRunId: "run-completion-direct-fallback", + requesterSessionKey: "main", + requesterDisplayKey: "main", + expectsCompletionMessage: true, + ...defaultOutcomeAnnounce, + }); + + expect(didAnnounce).toBe(true); + await expect.poll(() => agentSpy.mock.calls.length).toBe(2); + expect(agentSpy.mock.calls[0]?.[0]).toMatchObject({ + method: "agent", + params: { sessionKey: "agent:main:main" }, + }); + expect(agentSpy.mock.calls[1]?.[0]).toMatchObject({ + method: "agent", + params: { sessionKey: "agent:main:main" }, + }); + expect(agentSpy.mock.calls[1]?.[0]).toMatchObject({ + method: "agent", + params: { channel: "whatsapp", to: "+1555", deliver: true }, + }); + }); + + it("returns failure for completion-mode when direct delivery fails and queue fallback is unavailable", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false); + embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); + sessionStore = { + "agent:main:main": { + sessionId: "session-direct-only", + lastChannel: "whatsapp", + lastTo: "+1555", + }, + }; + agentSpy.mockRejectedValueOnce(new Error("direct delivery unavailable")); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:worker", + childRunId: "run-completion-direct-fail", + requesterSessionKey: "main", + requesterDisplayKey: "main", + expectsCompletionMessage: true, + ...defaultOutcomeAnnounce, + }); + + expect(didAnnounce).toBe(false); + expect(agentSpy).toHaveBeenCalledTimes(1); + }); + + it("uses assistant output for completion-mode when latest assistant text exists", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + chatHistoryMock.mockResolvedValueOnce({ + messages: [ + { + role: "toolResult", + content: [{ type: "text", text: "old tool output" }], + }, + { + role: "assistant", + content: [{ type: "text", text: "assistant completion text" }], + }, + ], + }); + readLatestAssistantReplyMock.mockResolvedValue("assistant ignored fallback"); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:worker", + childRunId: "run-completion-assistant-output", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + expectsCompletionMessage: true, + ...defaultOutcomeAnnounce, + }); + + expect(didAnnounce).toBe(true); + await expect.poll(() => agentSpy.mock.calls.length).toBe(1); + const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } }; + const msg = call?.params?.message as string; + expect(msg).toContain("assistant completion text"); + expect(msg).not.toContain("old tool output"); + }); + + it("falls back to latest tool output for completion-mode when assistant output is empty", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + chatHistoryMock.mockResolvedValueOnce({ + messages: [ + { + role: "assistant", + content: [{ type: "text", text: "" }], + }, + { + role: "toolResult", + content: [{ type: "text", text: "tool output only" }], + }, + ], + }); + readLatestAssistantReplyMock.mockResolvedValue(""); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:worker", + childRunId: "run-completion-tool-output", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + expectsCompletionMessage: true, + ...defaultOutcomeAnnounce, + }); + + expect(didAnnounce).toBe(true); + await expect.poll(() => agentSpy.mock.calls.length).toBe(1); + const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } }; + const msg = call?.params?.message as string; + expect(msg).toContain("tool output only"); + }); + it("queues announce delivery back into requester subagent session", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); diff --git a/src/agents/subagent-registry.steer-restart.test.ts b/src/agents/subagent-registry.steer-restart.test.ts index c61fdd179..95c1ff506 100644 --- a/src/agents/subagent-registry.steer-restart.test.ts +++ b/src/agents/subagent-registry.steer-restart.test.ts @@ -239,4 +239,67 @@ describe("subagent registry steer restarts", () => { expect(childRunIds.filter((id) => id === "run-parent")).toHaveLength(2); expect(childRunIds.filter((id) => id === "run-child")).toHaveLength(1); }); + + it("retries completion-mode announce delivery with backoff and then gives up after retry limit", async () => { + const callGateway = vi.mocked((await import("../gateway/call.js")).callGateway); + const originalCallGateway = callGateway.getMockImplementation(); + callGateway.mockImplementation(async (request: unknown) => { + const typed = request as { method?: string }; + if (typed.method === "agent.wait") { + return new Promise(() => undefined); + } + if (originalCallGateway) { + return originalCallGateway(request as Parameters[0]); + } + return {}; + }); + + vi.useFakeTimers(); + try { + announceSpy.mockResolvedValue(false); + + mod.registerSubagentRun({ + runId: "run-completion-retry", + childSessionKey: "agent:main:subagent:completion", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "completion retry", + cleanup: "keep", + expectsCompletionMessage: true, + }); + + lifecycleHandler?.({ + stream: "lifecycle", + runId: "run-completion-retry", + data: { phase: "end" }, + }); + + await vi.advanceTimersByTimeAsync(0); + expect(announceSpy).toHaveBeenCalledTimes(1); + expect(mod.listSubagentRunsForRequester("agent:main:main")[0]?.announceRetryCount).toBe(1); + + await vi.advanceTimersByTimeAsync(999); + expect(announceSpy).toHaveBeenCalledTimes(1); + await vi.advanceTimersByTimeAsync(1); + expect(announceSpy).toHaveBeenCalledTimes(2); + expect(mod.listSubagentRunsForRequester("agent:main:main")[0]?.announceRetryCount).toBe(2); + + await vi.advanceTimersByTimeAsync(1_999); + expect(announceSpy).toHaveBeenCalledTimes(2); + await vi.advanceTimersByTimeAsync(1); + expect(announceSpy).toHaveBeenCalledTimes(3); + expect(mod.listSubagentRunsForRequester("agent:main:main")[0]?.announceRetryCount).toBe(3); + + await vi.advanceTimersByTimeAsync(10_000); + expect(announceSpy).toHaveBeenCalledTimes(3); + expect(mod.listSubagentRunsForRequester("agent:main:main")[0]?.cleanupCompletedAt).toBeTypeOf( + "number", + ); + } finally { + if (originalCallGateway) { + callGateway.mockImplementation(originalCallGateway); + } + vi.useRealTimers(); + } + }); });