diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a37de212..a82e9d3de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai ### Changes +- Agents/Subagents: default subagent spawn depth now uses shared `maxSpawnDepth=2`, enabling depth-1 orchestrator spawning by default while keeping depth policy checks consistent across spawn and prompt paths. (#22223) Thanks @tyler6204. - Channels/CLI: add per-account/channel `defaultTo` outbound routing fallback so `openclaw agent --deliver` can send without explicit `--reply-to` when a default target is configured. (#16985) Thanks @KirillShchetinin. - iOS/Chat: clean chat UI noise by stripping inbound untrusted metadata/timestamp prefixes, formatting tool outputs into concise summaries/errors, compacting the composer while typing, and supporting tap-to-dismiss keyboard in chat view. (#22122) thanks @mbelinky. - iOS/Watch: bridge mirrored watch prompt notification actions into iOS quick-reply handling, including queued action handoff until app model initialization. (#22123) thanks @mbelinky. @@ -19,6 +20,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Agents/Subagents: restore announce-chain delivery to agent injection, defer nested announce output until descendant follow-up content is ready, and prevent descendant deferrals from consuming announce retry budget so deep chains do not drop final completions. (#22223) Thanks @tyler6204. - Gateway/Auth: require `gateway.trustedProxies` to include a loopback proxy address when `auth.mode="trusted-proxy"` and `bind="loopback"`, preventing same-host proxy misconfiguration from silently blocking auth. (#22082, follow-up to #20097) thanks @mbelinky. - Agents/System Prompt: label allowlisted senders as authorized senders to avoid implying ownership. Thanks @thewilloftheshadow. - Gateway/Auth: allow trusted-proxy mode with loopback bind for same-host reverse-proxy deployments, while still requiring configured `gateway.trustedProxies`. (#20097) thanks @xinhuagu. diff --git a/docs/concepts/session-tool.md b/docs/concepts/session-tool.md index b44d892be..d2f4b7dec 100644 --- a/docs/concepts/session-tool.md +++ b/docs/concepts/session-tool.md @@ -166,12 +166,12 @@ Behavior: - Starts a new `agent::subagent:` session with `deliver: false`. - Sub-agents default to the full tool set **minus session tools** (configurable via `tools.subagents.tools`). -- Sub-agents are not allowed to call `sessions_spawn` (no sub-agent → sub-agent spawning). +- Depth policy is enforced for nested spawns. With the default `maxSpawnDepth = 2`, depth-1 sub-agents can call `sessions_spawn`, depth-2 sub-agents cannot. - Always non-blocking: returns `{ status: "accepted", runId, childSessionKey }` immediately. -- After completion, OpenClaw runs a sub-agent **announce step** and posts the result to the requester chat channel. - - If the assistant final reply is empty, the latest `toolResult` from sub-agent history is included as `Result`. -- Reply exactly `ANNOUNCE_SKIP` during the announce step to stay silent. -- Announce replies are normalized to `Status`/`Result`/`Notes`; `Status` comes from runtime outcome (not model text). +- After completion, OpenClaw builds a sub-agent announce system message from the child session's latest assistant reply and injects it to the requester session. +- Delivery stays internal (`deliver=false`) when the requester is a sub-agent, and is user-facing (`deliver=true`) when the requester is main. +- Recipient agents can return the internal silent token to suppress duplicate outward delivery in the same turn. +- Announce replies are normalized to runtime-derived status plus result context. - Sub-agent sessions are auto-archived after `agents.defaults.subagents.archiveAfterMinutes` (default: 60). - Announce replies include a stats line (runtime, tokens, sessionKey/sessionId, transcript path, and optional cost). diff --git a/docs/tools/subagents.md b/docs/tools/subagents.md index 3022d5519..e05bcf98e 100644 --- a/docs/tools/subagents.md +++ b/docs/tools/subagents.md @@ -35,7 +35,7 @@ Use `/subagents` to inspect or control sub-agent runs for the **current session* - If direct delivery fails, it falls back to queue routing. - If queue routing is still not available, the announce is retried with a short exponential backoff before final give-up. - The completion message is a system message and includes: - - `Result` (`assistant` reply text, or latest `toolResult` if the assistant reply is empty) + - `Result` (latest assistant reply text from the child session, after a short settle retry) - `Status` (`completed successfully` / `failed` / `timed out`) - compact runtime/token stats - `--model` and `--thinking` override defaults for that specific run. @@ -90,7 +90,7 @@ Auto-archive: ## Nested Sub-Agents -By default, sub-agents cannot spawn their own sub-agents (`maxSpawnDepth: 1`). You can enable one level of nesting by setting `maxSpawnDepth: 2`, which allows the **orchestrator pattern**: main → orchestrator sub-agent → worker sub-sub-agents. +By default, sub-agents can spawn one additional level (`maxSpawnDepth: 2`), enabling the **orchestrator pattern**: main → orchestrator sub-agent → worker sub-sub-agents. Set `maxSpawnDepth: 1` to disable nested spawning. ### How to enable @@ -99,7 +99,7 @@ By default, sub-agents cannot spawn their own sub-agents (`maxSpawnDepth: 1`). Y agents: { defaults: { subagents: { - maxSpawnDepth: 2, // allow sub-agents to spawn children (default: 1) + maxSpawnDepth: 2, // allow sub-agents to spawn children (default: 2) maxChildrenPerAgent: 5, // max active children per agent session (default: 5) maxConcurrent: 8, // global concurrency lane cap (default: 8) }, @@ -110,11 +110,11 @@ By default, sub-agents cannot spawn their own sub-agents (`maxSpawnDepth: 1`). Y ### Depth levels -| Depth | Session key shape | Role | Can spawn? | -| ----- | -------------------------------------------- | --------------------------------------------- | ---------------------------- | -| 0 | `agent::main` | Main agent | Always | -| 1 | `agent::subagent:` | Sub-agent (orchestrator when depth 2 allowed) | Only if `maxSpawnDepth >= 2` | -| 2 | `agent::subagent::subagent:` | Sub-sub-agent (leaf worker) | Never | +| Depth | Session key shape | Role | Can spawn? | +| ----- | -------------------------------------------- | ----------------------------------- | ------------------------------ | +| 0 | `agent::main` | Main agent | Always | +| 1 | `agent::subagent:` | Sub-agent (orchestrator by default) | Yes, when `maxSpawnDepth >= 2` | +| 2 | `agent::subagent::subagent:` | Sub-sub-agent (leaf worker) | No, when `maxSpawnDepth = 2` | ### Announce chain @@ -128,9 +128,9 @@ Each level only sees announces from its direct children. ### Tool policy by depth -- **Depth 1 (orchestrator, when `maxSpawnDepth >= 2`)**: Gets `sessions_spawn`, `subagents`, `sessions_list`, `sessions_history` so it can manage its children. Other session/system tools remain denied. -- **Depth 1 (leaf, when `maxSpawnDepth == 1`)**: No session tools (current default behavior). -- **Depth 2 (leaf worker)**: No session tools — `sessions_spawn` is always denied at depth 2. Cannot spawn further children. +- **Depth 1 (orchestrator, default with `maxSpawnDepth = 2`)**: Gets `sessions_spawn`, `subagents`, `sessions_list`, `sessions_history` so it can manage its children. Other session/system tools remain denied. +- **Depth 1 (leaf, when `maxSpawnDepth = 1`)**: No session tools. +- **Depth 2 (leaf worker, default `maxSpawnDepth = 2`)**: No session tools, `sessions_spawn` is denied at depth 2, cannot spawn further children. ### Per-agent spawn limit @@ -156,17 +156,16 @@ Note: the merge is additive, so main profiles are always available as fallbacks. ## Announce -Sub-agents report back via an announce step: +Sub-agents report back via an announce injection step: -- The announce step runs inside the sub-agent session (not the requester session). -- If the sub-agent replies exactly `ANNOUNCE_SKIP`, nothing is posted. -- Otherwise the announce reply is posted to the requester chat channel via a follow-up `agent` call (`deliver=true`). -- Announce replies preserve thread/topic routing when available (Slack threads, Telegram topics, Matrix threads). -- Announce messages are normalized to a stable template: - - `Status:` derived from the run outcome (`success`, `error`, `timeout`, or `unknown`). - - `Result:` the summary content from the announce step (or `(not available)` if missing). - - `Notes:` error details and other useful context. -- `Status` is not inferred from model output; it comes from runtime outcome signals. +- OpenClaw reads the child session's latest assistant reply after completion, with a short settle retry. +- It builds a system message with `Status`, `Result`, compact stats, and reply guidance. +- The message is injected with a follow-up `agent` call: + - `deliver=false` when the requester is another sub-agent, this keeps orchestration internal. + - `deliver=true` when the requester is main, this produces the user-facing update. +- Delivery context prefers captured requester origin, but non-deliverable channels (for example `webchat`) are ignored in favor of persisted deliverable routes. +- Recipient agents can return the internal silent token to suppress duplicate outward delivery in the same turn. +- `Status` is derived from runtime outcome signals, not inferred from model output. Announce payloads include a stats line at the end (even when wrapped): @@ -184,7 +183,7 @@ By default, sub-agents get **all tools except session tools** and system tools: - `sessions_send` - `sessions_spawn` -When `maxSpawnDepth >= 2`, depth-1 orchestrator sub-agents additionally receive `sessions_spawn`, `subagents`, `sessions_list`, and `sessions_history` so they can manage their children. +With the default `maxSpawnDepth = 2`, depth-1 orchestrator sub-agents receive `sessions_spawn`, `subagents`, `sessions_list`, and `sessions_history` so they can manage their children. If you set `maxSpawnDepth = 1`, those session tools stay denied. Override via config: diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn-depth-limits.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn-depth-limits.test.ts index 0cb5b62c8..08d499d94 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn-depth-limits.test.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn-depth-limits.test.ts @@ -94,13 +94,14 @@ describe("sessions_spawn depth + child limits", () => { }); }); - it("rejects spawning when caller depth reaches maxSpawnDepth", async () => { + it("allows depth-1 callers by default (maxSpawnDepth defaults to 2)", async () => { const tool = createSessionsSpawnTool({ agentSessionKey: "agent:main:subagent:parent" }); const result = await tool.execute("call-depth-reject", { task: "hello" }); expect(result.details).toMatchObject({ - status: "forbidden", - error: "sessions_spawn is not allowed at this depth (current depth: 1, max: 1)", + status: "accepted", + childSessionKey: expect.stringMatching(/^agent:main:subagent:/), + runId: "run-depth", }); }); diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn.lifecycle.e2e.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn.lifecycle.e2e.test.ts index b3fbdacf1..8f77474d1 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn.lifecycle.e2e.test.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn.lifecycle.e2e.test.ts @@ -133,35 +133,6 @@ const waitFor = async (predicate: () => boolean, timeoutMs = 2000) => { ); }; -function expectSingleCompletionSend( - calls: GatewayRequest[], - expected: { sessionKey: string; channel: string; to: string; message: string }, -) { - const sendCalls = calls.filter((call) => call.method === "send"); - expect(sendCalls).toHaveLength(1); - const send = sendCalls[0]?.params as - | { sessionKey?: string; channel?: string; to?: string; message?: string } - | undefined; - expect(send?.sessionKey).toBe(expected.sessionKey); - expect(send?.channel).toBe(expected.channel); - expect(send?.to).toBe(expected.to); - expect(send?.message).toBe(expected.message); -} - -function createDeleteCleanupHooks(setDeletedKey: (key: string | undefined) => void) { - return { - onAgentSubagentSpawn: (params: unknown) => { - const rec = params as { channel?: string; timeout?: number } | undefined; - expect(rec?.channel).toBe("discord"); - expect(rec?.timeout).toBe(1); - }, - onSessionsDelete: (params: unknown) => { - const rec = params as { key?: string } | undefined; - setDeletedKey(rec?.key); - }, - }; -} - describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => { beforeEach(() => { resetSessionsSpawnConfigOverride(); @@ -184,7 +155,6 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => { const tool = await getSessionsSpawnTool({ agentSessionKey: "main", agentChannel: "whatsapp", - agentTo: "+123", }); const result = await tool.execute("call2", { @@ -213,7 +183,7 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => { await waitFor(() => ctx.waitCalls.some((call) => call.runId === child.runId)); await waitFor(() => patchCalls.some((call) => call.label === "my-task")); - await waitFor(() => ctx.calls.filter((c) => c.method === "send").length >= 1); + await waitFor(() => ctx.calls.filter((c) => c.method === "agent").length >= 2); const childWait = ctx.waitCalls.find((call) => call.runId === child.runId); expect(childWait?.timeoutMs).toBe(1000); @@ -222,21 +192,22 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => { expect(labelPatch?.key).toBe(child.sessionKey); expect(labelPatch?.label).toBe("my-task"); - // Subagent spawn call plus direct outbound completion send. + // Two agent calls: subagent spawn + main agent trigger const agentCalls = ctx.calls.filter((c) => c.method === "agent"); - expect(agentCalls).toHaveLength(1); + expect(agentCalls).toHaveLength(2); // First call: subagent spawn const first = agentCalls[0]?.params as { lane?: string } | undefined; expect(first?.lane).toBe("subagent"); - // Direct send should route completion to the requester channel/session. - expectSingleCompletionSend(ctx.calls, { - sessionKey: "agent:main:main", - channel: "whatsapp", - to: "+123", - message: "✅ Subagent main finished\n\ndone", - }); + // Second call: main agent trigger (not "Sub-agent announce step." anymore) + const second = agentCalls[1]?.params as { sessionKey?: string; message?: string } | undefined; + expect(second?.sessionKey).toBe("main"); + expect(second?.message).toContain("subagent task"); + + // No direct send to external channel (main agent handles delivery) + const sendCalls = ctx.calls.filter((c) => c.method === "send"); + expect(sendCalls.length).toBe(0); expect(child.sessionKey?.startsWith("agent:main:subagent:")).toBe(true); }); @@ -245,15 +216,20 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => { callGatewayMock.mockReset(); let deletedKey: string | undefined; const ctx = setupSessionsSpawnGatewayMock({ - ...createDeleteCleanupHooks((key) => { - deletedKey = key; - }), + onAgentSubagentSpawn: (params) => { + const rec = params as { channel?: string; timeout?: number } | undefined; + expect(rec?.channel).toBe("discord"); + expect(rec?.timeout).toBe(1); + }, + onSessionsDelete: (params) => { + const rec = params as { key?: string } | undefined; + deletedKey = rec?.key; + }, }); const tool = await getSessionsSpawnTool({ agentSessionKey: "discord:group:req", agentChannel: "discord", - agentTo: "discord:dm:u123", }); const result = await tool.execute("call1", { @@ -291,7 +267,7 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => { expect(childWait?.timeoutMs).toBe(1000); const agentCalls = ctx.calls.filter((call) => call.method === "agent"); - expect(agentCalls).toHaveLength(1); + expect(agentCalls).toHaveLength(2); const first = agentCalls[0]?.params as | { @@ -307,12 +283,19 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => { expect(first?.sessionKey?.startsWith("agent:main:subagent:")).toBe(true); expect(child.sessionKey?.startsWith("agent:main:subagent:")).toBe(true); - expectSingleCompletionSend(ctx.calls, { - sessionKey: "agent:main:discord:group:req", - channel: "discord", - to: "discord:dm:u123", - message: "✅ Subagent main finished", - }); + const second = agentCalls[1]?.params as + | { + sessionKey?: string; + message?: string; + deliver?: boolean; + } + | undefined; + expect(second?.sessionKey).toBe("discord:group:req"); + expect(second?.deliver).toBe(true); + expect(second?.message).toContain("subagent task"); + + const sendCalls = ctx.calls.filter((c) => c.method === "send"); + expect(sendCalls.length).toBe(0); expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true); }); @@ -323,16 +306,21 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => { let deletedKey: string | undefined; const ctx = setupSessionsSpawnGatewayMock({ includeChatHistory: true, - ...createDeleteCleanupHooks((key) => { - deletedKey = key; - }), + onAgentSubagentSpawn: (params) => { + const rec = params as { channel?: string; timeout?: number } | undefined; + expect(rec?.channel).toBe("discord"); + expect(rec?.timeout).toBe(1); + }, + onSessionsDelete: (params) => { + const rec = params as { key?: string } | undefined; + deletedKey = rec?.key; + }, agentWaitResult: { status: "ok", startedAt: 3000, endedAt: 4000 }, }); const tool = await getSessionsSpawnTool({ agentSessionKey: "discord:group:req", agentChannel: "discord", - agentTo: "discord:dm:u123", }); const result = await tool.execute("call1b", { @@ -350,27 +338,29 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => { throw new Error("missing child runId"); } await waitFor(() => ctx.waitCalls.some((call) => call.runId === child.runId)); - await waitFor(() => ctx.calls.filter((call) => call.method === "send").length >= 1); + await waitFor(() => ctx.calls.filter((call) => call.method === "agent").length >= 2); await waitFor(() => Boolean(deletedKey)); const childWait = ctx.waitCalls.find((call) => call.runId === child.runId); expect(childWait?.timeoutMs).toBe(1000); expect(child.sessionKey?.startsWith("agent:main:subagent:")).toBe(true); - // One agent call for spawn, then direct completion send. + // Two agent calls: subagent spawn + main agent trigger const agentCalls = ctx.calls.filter((call) => call.method === "agent"); - expect(agentCalls).toHaveLength(1); + expect(agentCalls).toHaveLength(2); // First call: subagent spawn const first = agentCalls[0]?.params as { lane?: string } | undefined; expect(first?.lane).toBe("subagent"); - expectSingleCompletionSend(ctx.calls, { - sessionKey: "agent:main:discord:group:req", - channel: "discord", - to: "discord:dm:u123", - message: "✅ Subagent main finished\n\ndone", - }); + // Second call: main agent trigger + const second = agentCalls[1]?.params as { sessionKey?: string; deliver?: boolean } | undefined; + expect(second?.sessionKey).toBe("discord:group:req"); + expect(second?.deliver).toBe(true); + + // No direct send to external channel (main agent handles delivery) + const sendCalls = ctx.calls.filter((c) => c.method === "send"); + expect(sendCalls.length).toBe(0); // Session should be deleted expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true); diff --git a/src/agents/pi-tools.policy.ts b/src/agents/pi-tools.policy.ts index 14b0e2d29..3c363ac41 100644 --- a/src/agents/pi-tools.policy.ts +++ b/src/agents/pi-tools.policy.ts @@ -1,4 +1,5 @@ import { getChannelDock } from "../channels/dock.js"; +import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../config/agent-limits.js"; import type { OpenClawConfig } from "../config/config.js"; import { resolveChannelGroupToolsPolicy } from "../config/group-policy.js"; import { resolveThreadParentSessionKey } from "../sessions/session-key-utils.js"; @@ -83,7 +84,8 @@ function resolveSubagentDenyList(depth: number, maxSpawnDepth: number): string[] export function resolveSubagentToolPolicy(cfg?: OpenClawConfig, depth?: number): SandboxToolPolicy { const configured = cfg?.tools?.subagents?.tools; - const maxSpawnDepth = cfg?.agents?.defaults?.subagents?.maxSpawnDepth ?? 1; + const maxSpawnDepth = + cfg?.agents?.defaults?.subagents?.maxSpawnDepth ?? DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH; const effectiveDepth = typeof depth === "number" && depth >= 0 ? depth : 1; const baseDeny = resolveSubagentDenyList(effectiveDepth, maxSpawnDepth); const deny = [...baseDeny, ...(Array.isArray(configured?.deny) ? configured.deny : [])]; diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index b6e594a40..8af9bed03 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -7,8 +7,13 @@ type RequesterResolution = { requesterOrigin?: Record; } | null; +type DescendantRun = { + runId: string; + requesterSessionKey: string; + childSessionKey: string; +}; + const agentSpy = vi.fn(async (_req: AgentCallRequest) => ({ runId: "run-main", status: "ok" })); -const sendSpy = vi.fn(async (_req: AgentCallRequest) => ({ runId: "send-main", status: "ok" })); const sessionsDeleteSpy = vi.fn((_req: AgentCallRequest) => undefined); const readLatestAssistantReplyMock = vi.fn( async (_sessionKey?: string): Promise => "raw subagent reply", @@ -22,11 +27,9 @@ const embeddedRunMock = { const subagentRegistryMock = { isSubagentSessionRunActive: vi.fn(() => true), countActiveDescendantRuns: vi.fn((_sessionKey: string) => 0), + listDescendantRunsForRequester: vi.fn((_sessionKey: string): DescendantRun[] => []), resolveRequesterForChildSession: vi.fn((_sessionKey: string): RequesterResolution => null), }; -const chatHistoryMock = vi.fn(async (_sessionKey?: string) => ({ - messages: [] as Array, -})); let sessionStore: Record> = {}; let configOverride: ReturnType<(typeof import("../config/config.js"))["loadConfig"]> = { session: { @@ -67,15 +70,9 @@ vi.mock("../gateway/call.js", () => ({ if (typed.method === "agent") { return await agentSpy(typed); } - if (typed.method === "send") { - return await sendSpy(typed); - } if (typed.method === "agent.wait") { return { status: "error", startedAt: 10, endedAt: 20, error: "boom" }; } - if (typed.method === "chat.history") { - return await chatHistoryMock(typed.params?.sessionKey); - } if (typed.method === "sessions.patch") { return {}; } @@ -115,7 +112,6 @@ vi.mock("../config/config.js", async (importOriginal) => { describe("subagent announce formatting", () => { beforeEach(() => { agentSpy.mockClear(); - sendSpy.mockClear(); sessionsDeleteSpy.mockClear(); embeddedRunMock.isEmbeddedPiRunActive.mockReset().mockReturnValue(false); embeddedRunMock.isEmbeddedPiRunStreaming.mockReset().mockReturnValue(false); @@ -123,9 +119,9 @@ describe("subagent announce formatting", () => { embeddedRunMock.waitForEmbeddedPiRunEnd.mockReset().mockResolvedValue(true); subagentRegistryMock.isSubagentSessionRunActive.mockReset().mockReturnValue(true); subagentRegistryMock.countActiveDescendantRuns.mockReset().mockReturnValue(0); + subagentRegistryMock.listDescendantRunsForRequester.mockReset().mockReturnValue([]); subagentRegistryMock.resolveRequesterForChildSession.mockReset().mockReturnValue(null); readLatestAssistantReplyMock.mockReset().mockResolvedValue("raw subagent reply"); - chatHistoryMock.mockReset().mockResolvedValue({ messages: [] }); sessionStore = {}; configOverride = { session: { @@ -209,72 +205,6 @@ describe("subagent announce formatting", () => { ); }); - it.each([ - { role: "toolResult", toolOutput: "tool output line 1", childRunId: "run-tool-fallback-1" }, - { role: "tool", toolOutput: "tool output line 2", childRunId: "run-tool-fallback-2" }, - ] as const)( - "falls back to latest $role output when assistant reply is empty", - async (testCase) => { - const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); - chatHistoryMock.mockResolvedValueOnce({ - messages: [ - { - role: "assistant", - content: [{ type: "text", text: "" }], - }, - { - role: testCase.role, - content: [{ type: "text", text: testCase.toolOutput }], - }, - ], - }); - readLatestAssistantReplyMock.mockResolvedValue(""); - - await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:worker", - childRunId: testCase.childRunId, - requesterSessionKey: "agent:main:main", - requesterDisplayKey: "main", - ...defaultOutcomeAnnounce, - waitForCompletion: false, - }); - - const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } }; - const msg = call?.params?.message as string; - expect(msg).toContain(testCase.toolOutput); - }, - ); - - it("uses latest assistant text when it appears after a tool output", async () => { - const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); - chatHistoryMock.mockResolvedValueOnce({ - messages: [ - { - role: "tool", - content: [{ type: "text", text: "tool output line" }], - }, - { - role: "assistant", - content: [{ type: "text", text: "assistant final line" }], - }, - ], - }); - readLatestAssistantReplyMock.mockResolvedValue(""); - - await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:worker", - childRunId: "run-latest-assistant", - requesterSessionKey: "agent:main:main", - requesterDisplayKey: "main", - ...defaultOutcomeAnnounce, - waitForCompletion: false, - }); - - const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } }; - const msg = call?.params?.message as string; - expect(msg).toContain("assistant final line"); - }); - it("keeps full findings and includes compact stats", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); sessionStore = { @@ -312,121 +242,6 @@ describe("subagent announce formatting", () => { expect(msg).toContain("step-139"); }); - it("sends deterministic completion message directly for manual spawn completion", async () => { - const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); - sessionStore = { - "agent:main:subagent:test": { - sessionId: "child-session-direct", - inputTokens: 12, - outputTokens: 34, - totalTokens: 46, - }, - "agent:main:main": { - sessionId: "requester-session", - }, - }; - chatHistoryMock.mockResolvedValueOnce({ - messages: [{ role: "assistant", content: [{ type: "text", text: "final answer: 2" }] }], - }); - - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:test", - childRunId: "run-direct-completion", - requesterSessionKey: "agent:main:main", - requesterDisplayKey: "main", - requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" }, - ...defaultOutcomeAnnounce, - expectsCompletionMessage: true, - }); - - expect(didAnnounce).toBe(true); - expect(sendSpy).toHaveBeenCalledTimes(1); - expect(agentSpy).not.toHaveBeenCalled(); - const call = sendSpy.mock.calls[0]?.[0] as { params?: Record }; - const rawMessage = call?.params?.message; - const msg = typeof rawMessage === "string" ? rawMessage : ""; - expect(call?.params?.channel).toBe("discord"); - expect(call?.params?.to).toBe("channel:12345"); - expect(call?.params?.sessionKey).toBe("agent:main:main"); - expect(msg).toContain("✅ Subagent main finished"); - expect(msg).toContain("final answer: 2"); - expect(msg).not.toContain("Convert the result above into your normal assistant voice"); - }); - - it("ignores stale session thread hints for manual completion direct-send", async () => { - const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); - sessionStore = { - "agent:main:subagent:test": { - sessionId: "child-session-direct-thread", - }, - "agent:main:main": { - sessionId: "requester-session-thread", - lastChannel: "discord", - lastTo: "channel:stale", - lastThreadId: 42, - }, - }; - chatHistoryMock.mockResolvedValueOnce({ - messages: [{ role: "assistant", content: [{ type: "text", text: "done" }] }], - }); - - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:test", - childRunId: "run-direct-stale-thread", - requesterSessionKey: "agent:main:main", - requesterDisplayKey: "main", - requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" }, - ...defaultOutcomeAnnounce, - expectsCompletionMessage: true, - }); - - expect(didAnnounce).toBe(true); - expect(sendSpy).toHaveBeenCalledTimes(1); - expect(agentSpy).not.toHaveBeenCalled(); - const call = sendSpy.mock.calls[0]?.[0] as { params?: Record }; - expect(call?.params?.channel).toBe("discord"); - expect(call?.params?.to).toBe("channel:12345"); - expect(call?.params?.threadId).toBeUndefined(); - }); - - it("passes requesterOrigin.threadId for manual completion direct-send", async () => { - const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); - sessionStore = { - "agent:main:subagent:test": { - sessionId: "child-session-direct-thread-pass", - }, - "agent:main:main": { - sessionId: "requester-session-thread-pass", - }, - }; - chatHistoryMock.mockResolvedValueOnce({ - messages: [{ role: "assistant", content: [{ type: "text", text: "done" }] }], - }); - - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:test", - childRunId: "run-direct-thread-pass", - requesterSessionKey: "agent:main:main", - requesterDisplayKey: "main", - requesterOrigin: { - channel: "discord", - to: "channel:12345", - accountId: "acct-1", - threadId: 99, - }, - ...defaultOutcomeAnnounce, - expectsCompletionMessage: true, - }); - - expect(didAnnounce).toBe(true); - expect(sendSpy).toHaveBeenCalledTimes(1); - expect(agentSpy).not.toHaveBeenCalled(); - const call = sendSpy.mock.calls[0]?.[0] as { params?: Record }; - expect(call?.params?.channel).toBe("discord"); - expect(call?.params?.to).toBe("channel:12345"); - expect(call?.params?.threadId).toBe("99"); - }); - it("steers announcements into an active run when queue mode is steer", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); @@ -541,139 +356,6 @@ describe("subagent announce formatting", () => { expect(new Set(idempotencyKeys).size).toBe(2); }); - it("prefers direct delivery first for completion-mode and then queues on direct failure", async () => { - const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); - embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); - embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); - sessionStore = { - "agent:main:main": { - sessionId: "session-collect", - lastChannel: "whatsapp", - lastTo: "+1555", - queueMode: "collect", - queueDebounceMs: 0, - }, - }; - sendSpy.mockRejectedValueOnce(new Error("direct delivery unavailable")); - - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:worker", - childRunId: "run-completion-direct-fallback", - requesterSessionKey: "main", - requesterDisplayKey: "main", - expectsCompletionMessage: true, - ...defaultOutcomeAnnounce, - }); - - expect(didAnnounce).toBe(true); - await expect.poll(() => sendSpy.mock.calls.length).toBe(1); - await expect.poll(() => agentSpy.mock.calls.length).toBe(1); - expect(sendSpy.mock.calls[0]?.[0]).toMatchObject({ - method: "send", - params: { sessionKey: "agent:main:main" }, - }); - expect(agentSpy.mock.calls[0]?.[0]).toMatchObject({ - method: "agent", - params: { sessionKey: "agent:main:main" }, - }); - expect(agentSpy.mock.calls[0]?.[0]).toMatchObject({ - method: "agent", - params: { channel: "whatsapp", to: "+1555", deliver: true }, - }); - }); - - it("returns failure for completion-mode when direct delivery fails and queue fallback is unavailable", async () => { - const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); - embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false); - embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); - sessionStore = { - "agent:main:main": { - sessionId: "session-direct-only", - lastChannel: "whatsapp", - lastTo: "+1555", - }, - }; - sendSpy.mockRejectedValueOnce(new Error("direct delivery unavailable")); - - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:worker", - childRunId: "run-completion-direct-fail", - requesterSessionKey: "main", - requesterDisplayKey: "main", - expectsCompletionMessage: true, - ...defaultOutcomeAnnounce, - }); - - expect(didAnnounce).toBe(false); - expect(sendSpy).toHaveBeenCalledTimes(1); - expect(agentSpy).toHaveBeenCalledTimes(0); - }); - - it("uses assistant output for completion-mode when latest assistant text exists", async () => { - const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); - chatHistoryMock.mockResolvedValueOnce({ - messages: [ - { - role: "toolResult", - content: [{ type: "text", text: "old tool output" }], - }, - { - role: "assistant", - content: [{ type: "text", text: "assistant completion text" }], - }, - ], - }); - readLatestAssistantReplyMock.mockResolvedValue("assistant ignored fallback"); - - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:worker", - childRunId: "run-completion-assistant-output", - requesterSessionKey: "agent:main:main", - requesterDisplayKey: "main", - expectsCompletionMessage: true, - ...defaultOutcomeAnnounce, - }); - - expect(didAnnounce).toBe(true); - await expect.poll(() => sendSpy.mock.calls.length).toBe(1); - const call = sendSpy.mock.calls[0]?.[0] as { params?: { message?: string } }; - const msg = call?.params?.message as string; - expect(msg).toContain("assistant completion text"); - expect(msg).not.toContain("old tool output"); - }); - - it("falls back to latest tool output for completion-mode when assistant output is empty", async () => { - const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); - chatHistoryMock.mockResolvedValueOnce({ - messages: [ - { - role: "assistant", - content: [{ type: "text", text: "" }], - }, - { - role: "toolResult", - content: [{ type: "text", text: "tool output only" }], - }, - ], - }); - readLatestAssistantReplyMock.mockResolvedValue(""); - - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:worker", - childRunId: "run-completion-tool-output", - requesterSessionKey: "agent:main:main", - requesterDisplayKey: "main", - expectsCompletionMessage: true, - ...defaultOutcomeAnnounce, - }); - - expect(didAnnounce).toBe(true); - await expect.poll(() => sendSpy.mock.calls.length).toBe(1); - const call = sendSpy.mock.calls[0]?.[0] as { params?: { message?: string } }; - const msg = call?.params?.message as string; - expect(msg).toContain("tool output only"); - }); - it("queues announce delivery back into requester subagent session", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); @@ -706,24 +388,7 @@ describe("subagent announce formatting", () => { expect(call?.params?.to).toBeUndefined(); }); - it.each([ - { - testName: "includes threadId when origin has an active topic/thread", - childRunId: "run-thread", - expectedThreadId: "42", - requesterOrigin: undefined, - }, - { - testName: "prefers requesterOrigin.threadId over session entry threadId", - childRunId: "run-thread-override", - expectedThreadId: "99", - requesterOrigin: { - channel: "telegram", - to: "telegram:123", - threadId: 99, - }, - }, - ] as const)("$testName", async (testCase) => { + it("includes threadId when origin has an active topic/thread", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); @@ -740,10 +405,9 @@ describe("subagent announce formatting", () => { const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: "agent:main:subagent:test", - childRunId: testCase.childRunId, + childRunId: "run-thread", requesterSessionKey: "main", requesterDisplayKey: "main", - ...(testCase.requesterOrigin ? { requesterOrigin: testCase.requesterOrigin } : {}), ...defaultOutcomeAnnounce, }); @@ -751,7 +415,42 @@ describe("subagent announce formatting", () => { const params = await getSingleAgentCallParams(); expect(params.channel).toBe("telegram"); expect(params.to).toBe("telegram:123"); - expect(params.threadId).toBe(testCase.expectedThreadId); + expect(params.threadId).toBe("42"); + }); + + it("prefers requesterOrigin.threadId over session entry threadId", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); + sessionStore = { + "agent:main:main": { + sessionId: "session-thread-override", + lastChannel: "telegram", + lastTo: "telegram:123", + lastThreadId: 42, + queueMode: "collect", + queueDebounceMs: 0, + }, + }; + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-thread-override", + requesterSessionKey: "main", + requesterDisplayKey: "main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + threadId: 99, + }, + ...defaultOutcomeAnnounce, + }); + + expect(didAnnounce).toBe(true); + await expect.poll(() => agentSpy.mock.calls.length).toBe(1); + + const call = agentSpy.mock.calls[0]?.[0] as { params?: Record }; + expect(call?.params?.threadId).toBe("99"); }); it("splits collect-mode queues when accountId differs", async () => { @@ -795,31 +494,16 @@ describe("subagent announce formatting", () => { expect(accountIds).toEqual(expect.arrayContaining(["acct-a", "acct-b"])); }); - it.each([ - { - testName: "uses requester origin for direct announce when not queued", - childRunId: "run-direct", - requesterOrigin: { channel: "whatsapp", accountId: "acct-123" }, - expectedChannel: "whatsapp", - expectedAccountId: "acct-123", - }, - { - testName: "normalizes requesterOrigin for direct announce delivery", - childRunId: "run-direct-origin", - requesterOrigin: { channel: " whatsapp ", accountId: " acct-987 " }, - expectedChannel: "whatsapp", - expectedAccountId: "acct-987", - }, - ] as const)("$testName", async (testCase) => { + it("uses requester origin for direct announce when not queued", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false); embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: "agent:main:subagent:test", - childRunId: testCase.childRunId, + childRunId: "run-direct", requesterSessionKey: "agent:main:main", - requesterOrigin: testCase.requesterOrigin, + requesterOrigin: { channel: "whatsapp", accountId: "acct-123" }, requesterDisplayKey: "main", ...defaultOutcomeAnnounce, }); @@ -829,8 +513,8 @@ describe("subagent announce formatting", () => { params?: Record; expectFinal?: boolean; }; - expect(call?.params?.channel).toBe(testCase.expectedChannel); - expect(call?.params?.accountId).toBe(testCase.expectedAccountId); + expect(call?.params?.channel).toBe("whatsapp"); + expect(call?.params?.accountId).toBe("acct-123"); expect(call?.expectFinal).toBe(true); }); @@ -933,6 +617,93 @@ describe("subagent announce formatting", () => { expect(agentSpy).not.toHaveBeenCalled(); }); + it("waits for follow-up reply when descendant runs exist and child reply is still waiting", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + const waitingReply = "Spawned the nested subagent. Waiting for its auto-announced result."; + const finalReply = "Nested subagent finished and I synthesized the final result."; + + subagentRegistryMock.listDescendantRunsForRequester.mockImplementation((sessionKey: string) => + sessionKey === "agent:main:subagent:parent" + ? [ + { + runId: "run-leaf", + requesterSessionKey: sessionKey, + childSessionKey: "agent:main:subagent:parent:subagent:leaf", + }, + ] + : [], + ); + readLatestAssistantReplyMock + .mockResolvedValueOnce(waitingReply) + .mockResolvedValueOnce(waitingReply) + .mockResolvedValueOnce(finalReply); + + vi.useFakeTimers(); + try { + const announcePromise = runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:parent", + childRunId: "run-parent", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + ...defaultOutcomeAnnounce, + }); + + await vi.advanceTimersByTimeAsync(500); + const didAnnounce = await announcePromise; + expect(didAnnounce).toBe(true); + } finally { + vi.useRealTimers(); + } + + const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } }; + const msg = call?.params?.message as string; + expect(msg).toContain(finalReply); + expect(msg).not.toContain("Waiting for its auto-announced result."); + }); + + it("defers announce when descendant follow-up reply has not arrived yet", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + const waitingReply = "Spawned the nested subagent. Waiting for its auto-announced result."; + + subagentRegistryMock.listDescendantRunsForRequester.mockImplementation((sessionKey: string) => + sessionKey === "agent:main:subagent:parent" + ? [ + { + runId: "run-leaf", + requesterSessionKey: sessionKey, + childSessionKey: "agent:main:subagent:parent:subagent:leaf", + }, + ] + : [], + ); + readLatestAssistantReplyMock.mockResolvedValue(waitingReply); + + vi.useFakeTimers(); + try { + const announcePromise = runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:parent", + childRunId: "run-parent-still-waiting", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "nested test", + timeoutMs: 700, + cleanup: "keep", + waitForCompletion: false, + startedAt: 10, + endedAt: 20, + outcome: { status: "ok" }, + }); + + await vi.advanceTimersByTimeAsync(1200); + const didAnnounce = await announcePromise; + expect(didAnnounce).toBe(false); + } finally { + vi.useRealTimers(); + } + + expect(agentSpy).not.toHaveBeenCalled(); + }); + it("bubbles child announce to parent requester when requester subagent already ended", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); subagentRegistryMock.isSubagentSessionRunActive.mockReturnValue(false); @@ -1013,6 +784,26 @@ describe("subagent announce formatting", () => { expect(agentSpy).not.toHaveBeenCalled(); }); + it("normalizes requesterOrigin for direct announce delivery", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false); + embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-direct-origin", + requesterSessionKey: "agent:main:main", + requesterOrigin: { channel: " whatsapp ", accountId: " acct-987 " }, + requesterDisplayKey: "main", + ...defaultOutcomeAnnounce, + }); + + expect(didAnnounce).toBe(true); + const call = agentSpy.mock.calls[0]?.[0] as { params?: Record }; + expect(call?.params?.channel).toBe("whatsapp"); + expect(call?.params?.accountId).toBe("acct-987"); + }); + it("prefers requesterOrigin channel over stale session lastChannel in queued announce", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); @@ -1045,6 +836,35 @@ describe("subagent announce formatting", () => { expect(call?.params?.to).toBe("bluebubbles:chat_guid:123"); }); + it("falls back to persisted deliverable route when requesterOrigin channel is non-deliverable", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false); + embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); + sessionStore = { + "agent:main:main": { + sessionId: "session-webchat-origin", + lastChannel: "discord", + lastTo: "discord:channel:123", + lastAccountId: "acct-store", + }, + }; + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-webchat-origin", + requesterSessionKey: "main", + requesterOrigin: { channel: "webchat", to: "ignored", accountId: "acct-live" }, + requesterDisplayKey: "main", + ...defaultOutcomeAnnounce, + }); + + expect(didAnnounce).toBe(true); + const call = agentSpy.mock.calls[0]?.[0] as { params?: Record }; + expect(call?.params?.channel).toBe("discord"); + expect(call?.params?.to).toBe("discord:channel:123"); + expect(call?.params?.accountId).toBe("acct-live"); + }); + it("routes to parent subagent when parent run ended but session still exists (#18037)", async () => { // Scenario: Newton (depth-1) spawns Birdie (depth-2). Newton's agent turn ends // after spawning but Newton's SESSION still exists (waiting for Birdie's result). diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 389ee1149..3f328e620 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -1,5 +1,6 @@ import { resolveQueueSettings } from "../auto-reply/reply/queue.js"; import { SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; +import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../config/agent-limits.js"; import { loadConfig } from "../config/config.js"; import { loadSessionStore, @@ -10,14 +11,13 @@ import { import { callGateway } from "../gateway/call.js"; import { normalizeMainKey } from "../routing/session-key.js"; import { defaultRuntime } from "../runtime.js"; -import { extractTextFromChatContent } from "../shared/chat-content.js"; import { type DeliveryContext, deliveryContextFromSession, mergeDeliveryContext, normalizeDeliveryContext, } from "../utils/delivery-context.js"; -import { isDeliverableMessageChannel } from "../utils/message-channel.js"; +import { isInternalMessageChannel } from "../utils/message-channel.js"; import { buildAnnounceIdFromChildRun, buildAnnounceIdempotencyKey, @@ -30,170 +30,7 @@ import { } from "./pi-embedded.js"; import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js"; import { getSubagentDepthFromSessionStore } from "./subagent-depth.js"; -import { sanitizeTextContent, extractAssistantText } from "./tools/sessions-helpers.js"; - -type ToolResultMessage = { - role?: unknown; - content?: unknown; -}; - -type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none"; - -type SubagentAnnounceDeliveryResult = { - delivered: boolean; - path: SubagentDeliveryPath; - error?: string; -}; - -function buildCompletionDeliveryMessage(params: { - findings: string; - subagentName: string; -}): string { - const findingsText = params.findings.trim(); - const hasFindings = findingsText.length > 0 && findingsText !== "(no output)"; - const header = `✅ Subagent ${params.subagentName} finished`; - if (!hasFindings) { - return header; - } - return `${header}\n\n${findingsText}`; -} - -function summarizeDeliveryError(error: unknown): string { - if (error instanceof Error) { - return error.message || "error"; - } - if (typeof error === "string") { - return error; - } - if (error === undefined || error === null) { - return "unknown error"; - } - try { - return JSON.stringify(error); - } catch { - return "error"; - } -} - -function extractToolResultText(content: unknown): string { - if (typeof content === "string") { - return sanitizeTextContent(content); - } - if (content && typeof content === "object" && !Array.isArray(content)) { - const obj = content as { - text?: unknown; - output?: unknown; - content?: unknown; - result?: unknown; - error?: unknown; - summary?: unknown; - }; - if (typeof obj.text === "string") { - return sanitizeTextContent(obj.text); - } - if (typeof obj.output === "string") { - return sanitizeTextContent(obj.output); - } - if (typeof obj.content === "string") { - return sanitizeTextContent(obj.content); - } - if (typeof obj.result === "string") { - return sanitizeTextContent(obj.result); - } - if (typeof obj.error === "string") { - return sanitizeTextContent(obj.error); - } - if (typeof obj.summary === "string") { - return sanitizeTextContent(obj.summary); - } - } - if (!Array.isArray(content)) { - return ""; - } - const joined = extractTextFromChatContent(content, { - sanitizeText: sanitizeTextContent, - normalizeText: (text) => text, - joinWith: "\n", - }); - return joined?.trim() ?? ""; -} - -function extractInlineTextContent(content: unknown): string { - if (!Array.isArray(content)) { - return ""; - } - return ( - extractTextFromChatContent(content, { - sanitizeText: sanitizeTextContent, - normalizeText: (text) => text.trim(), - joinWith: "", - }) ?? "" - ); -} - -function extractSubagentOutputText(message: unknown): string { - if (!message || typeof message !== "object") { - return ""; - } - const role = (message as { role?: unknown }).role; - const content = (message as { content?: unknown }).content; - if (role === "assistant") { - const assistantText = extractAssistantText(message); - if (assistantText) { - return assistantText; - } - if (typeof content === "string") { - return sanitizeTextContent(content); - } - if (Array.isArray(content)) { - return extractInlineTextContent(content); - } - return ""; - } - if (role === "toolResult" || role === "tool") { - return extractToolResultText((message as ToolResultMessage).content); - } - if (typeof content === "string") { - return sanitizeTextContent(content); - } - if (Array.isArray(content)) { - return extractInlineTextContent(content); - } - return ""; -} - -async function readLatestSubagentOutput(sessionKey: string): Promise { - const history = await callGateway<{ messages?: Array }>({ - method: "chat.history", - params: { sessionKey, limit: 50 }, - }); - const messages = Array.isArray(history?.messages) ? history.messages : []; - for (let i = messages.length - 1; i >= 0; i -= 1) { - const msg = messages[i]; - const text = extractSubagentOutputText(msg); - if (text) { - return text; - } - } - return undefined; -} - -async function readLatestSubagentOutputWithRetry(params: { - sessionKey: string; - maxWaitMs: number; -}): Promise { - const RETRY_INTERVAL_MS = 100; - const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 15_000)); - let result: string | undefined; - while (Date.now() < deadline) { - result = await readLatestSubagentOutput(params.sessionKey); - if (result?.trim()) { - return result; - } - await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS)); - } - return result; -} +import { readLatestAssistantReply } from "./tools/agent-step.js"; function formatDurationShort(valueMs?: number) { if (!valueMs || !Number.isFinite(valueMs) || valueMs <= 0) { @@ -273,8 +110,8 @@ function resolveAnnounceOrigin( ): DeliveryContext | undefined { const normalizedRequester = normalizeDeliveryContext(requesterOrigin); const normalizedEntry = deliveryContextFromSession(entry); - if (normalizedRequester?.channel && !isDeliverableMessageChannel(normalizedRequester.channel)) { - // Ignore internal/non-deliverable channel hints (for example webchat) + if (normalizedRequester?.channel && isInternalMessageChannel(normalizedRequester.channel)) { + // Ignore internal channel hints, for example webchat, // so a valid persisted route can still be used for outbound delivery. return mergeDeliveryContext( { @@ -284,7 +121,7 @@ function resolveAnnounceOrigin( normalizedEntry, ); } - // requesterOrigin (captured at spawn time) reflects the channel the user is + // requesterOrigin, captured at spawn time, reflects the channel the user is // actually on and must take priority over the session entry, which may carry // stale lastChannel / lastTo values from a previous channel interaction. return mergeDeliveryContext(normalizedRequester, normalizedEntry); @@ -408,182 +245,6 @@ async function maybeQueueSubagentAnnounce(params: { return "none"; } -function queueOutcomeToDeliveryResult( - outcome: "steered" | "queued" | "none", -): SubagentAnnounceDeliveryResult { - if (outcome === "steered") { - return { - delivered: true, - path: "steered", - }; - } - if (outcome === "queued") { - return { - delivered: true, - path: "queued", - }; - } - return { - delivered: false, - path: "none", - }; -} - -async function sendSubagentAnnounceDirectly(params: { - targetRequesterSessionKey: string; - triggerMessage: string; - completionMessage?: string; - expectsCompletionMessage: boolean; - directIdempotencyKey: string; - completionDirectOrigin?: DeliveryContext; - directOrigin?: DeliveryContext; - requesterIsSubagent: boolean; -}): Promise { - const cfg = loadConfig(); - const canonicalRequesterSessionKey = resolveRequesterStoreKey( - cfg, - params.targetRequesterSessionKey, - ); - try { - const completionDirectOrigin = normalizeDeliveryContext(params.completionDirectOrigin); - const completionChannelRaw = - typeof completionDirectOrigin?.channel === "string" - ? completionDirectOrigin.channel.trim() - : ""; - const completionChannel = - completionChannelRaw && isDeliverableMessageChannel(completionChannelRaw) - ? completionChannelRaw - : ""; - const completionTo = - typeof completionDirectOrigin?.to === "string" ? completionDirectOrigin.to.trim() : ""; - const hasCompletionDirectTarget = - !params.requesterIsSubagent && Boolean(completionChannel) && Boolean(completionTo); - - if ( - params.expectsCompletionMessage && - hasCompletionDirectTarget && - params.completionMessage?.trim() - ) { - const completionThreadId = - completionDirectOrigin?.threadId != null && completionDirectOrigin.threadId !== "" - ? String(completionDirectOrigin.threadId) - : undefined; - await callGateway({ - method: "send", - params: { - channel: completionChannel, - to: completionTo, - accountId: completionDirectOrigin?.accountId, - threadId: completionThreadId, - sessionKey: canonicalRequesterSessionKey, - message: params.completionMessage, - idempotencyKey: params.directIdempotencyKey, - }, - timeoutMs: 15_000, - }); - - return { - delivered: true, - path: "direct", - }; - } - - const directOrigin = normalizeDeliveryContext(params.directOrigin); - const threadId = - directOrigin?.threadId != null && directOrigin.threadId !== "" - ? String(directOrigin.threadId) - : undefined; - await callGateway({ - method: "agent", - params: { - sessionKey: canonicalRequesterSessionKey, - message: params.triggerMessage, - deliver: !params.requesterIsSubagent, - channel: params.requesterIsSubagent ? undefined : directOrigin?.channel, - accountId: params.requesterIsSubagent ? undefined : directOrigin?.accountId, - to: params.requesterIsSubagent ? undefined : directOrigin?.to, - threadId: params.requesterIsSubagent ? undefined : threadId, - idempotencyKey: params.directIdempotencyKey, - }, - expectFinal: true, - timeoutMs: 15_000, - }); - - return { - delivered: true, - path: "direct", - }; - } catch (err) { - return { - delivered: false, - path: "direct", - error: summarizeDeliveryError(err), - }; - } -} - -async function deliverSubagentAnnouncement(params: { - requesterSessionKey: string; - announceId?: string; - triggerMessage: string; - completionMessage?: string; - summaryLine?: string; - requesterOrigin?: DeliveryContext; - completionDirectOrigin?: DeliveryContext; - directOrigin?: DeliveryContext; - targetRequesterSessionKey: string; - requesterIsSubagent: boolean; - expectsCompletionMessage: boolean; - directIdempotencyKey: string; -}): Promise { - // Non-completion mode mirrors historical behavior: try queued/steered delivery first, - // then (only if not queued) attempt direct delivery. - if (!params.expectsCompletionMessage) { - const queueOutcome = await maybeQueueSubagentAnnounce({ - requesterSessionKey: params.requesterSessionKey, - announceId: params.announceId, - triggerMessage: params.triggerMessage, - summaryLine: params.summaryLine, - requesterOrigin: params.requesterOrigin, - }); - const queued = queueOutcomeToDeliveryResult(queueOutcome); - if (queued.delivered) { - return queued; - } - } - - // Completion-mode uses direct send first so manual spawns can return immediately - // in the common ready-to-deliver case. - const direct = await sendSubagentAnnounceDirectly({ - targetRequesterSessionKey: params.targetRequesterSessionKey, - triggerMessage: params.triggerMessage, - completionMessage: params.completionMessage, - directIdempotencyKey: params.directIdempotencyKey, - completionDirectOrigin: params.completionDirectOrigin, - directOrigin: params.directOrigin, - requesterIsSubagent: params.requesterIsSubagent, - expectsCompletionMessage: params.expectsCompletionMessage, - }); - if (direct.delivered || !params.expectsCompletionMessage) { - return direct; - } - - // If completion path failed direct delivery, try queueing as a fallback so the - // report can still be delivered once the requester session is idle. - const queueOutcome = await maybeQueueSubagentAnnounce({ - requesterSessionKey: params.requesterSessionKey, - announceId: params.announceId, - triggerMessage: params.triggerMessage, - summaryLine: params.summaryLine, - requesterOrigin: params.requesterOrigin, - }); - if (queueOutcome === "steered" || queueOutcome === "queued") { - return queueOutcomeToDeliveryResult(queueOutcome); - } - - return direct; -} - function loadSessionEntryByKey(sessionKey: string) { const cfg = loadConfig(); const agentId = resolveAgentIdFromSessionKey(sessionKey); @@ -592,6 +253,65 @@ function loadSessionEntryByKey(sessionKey: string) { return store[sessionKey]; } +async function readLatestAssistantReplyWithRetry(params: { + sessionKey: string; + initialReply?: string; + maxWaitMs: number; +}): Promise { + const RETRY_INTERVAL_MS = 100; + let reply = params.initialReply?.trim() ? params.initialReply : undefined; + if (reply) { + return reply; + } + + const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 15_000)); + while (Date.now() < deadline) { + await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS)); + const latest = await readLatestAssistantReply({ sessionKey: params.sessionKey }); + if (latest?.trim()) { + return latest; + } + } + return reply; +} + +function isLikelyWaitingForDescendantResult(reply?: string): boolean { + const text = reply?.trim(); + if (!text) { + return false; + } + const normalized = text.toLowerCase(); + if (!normalized.includes("waiting")) { + return false; + } + return ( + normalized.includes("subagent") || + normalized.includes("child") || + normalized.includes("auto-announce") || + normalized.includes("auto announced") || + normalized.includes("result") + ); +} + +async function waitForAssistantReplyChange(params: { + sessionKey: string; + previousReply?: string; + maxWaitMs: number; +}): Promise { + const RETRY_INTERVAL_MS = 200; + const previous = params.previousReply?.trim() ?? ""; + const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 30_000)); + while (Date.now() < deadline) { + await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS)); + const latest = await readLatestAssistantReply({ sessionKey: params.sessionKey }); + const normalizedLatest = latest?.trim() ?? ""; + if (normalizedLatest && normalizedLatest !== previous) { + return latest; + } + } + return undefined; +} + export function buildSubagentSystemPrompt(params: { requesterSessionKey?: string; requesterOrigin?: DeliveryContext; @@ -608,7 +328,10 @@ export function buildSubagentSystemPrompt(params: { ? params.task.replace(/\s+/g, " ").trim() : "{{TASK_DESCRIPTION}}"; const childDepth = typeof params.childDepth === "number" ? params.childDepth : 1; - const maxSpawnDepth = typeof params.maxSpawnDepth === "number" ? params.maxSpawnDepth : 1; + const maxSpawnDepth = + typeof params.maxSpawnDepth === "number" + ? params.maxSpawnDepth + : DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH; const canSpawn = childDepth < maxSpawnDepth; const parentLabel = childDepth >= 2 ? "parent orchestrator" : "main agent"; @@ -692,11 +415,7 @@ function buildAnnounceReplyInstruction(params: { remainingActiveSubagentRuns: number; requesterIsSubagent: boolean; announceType: SubagentAnnounceType; - expectsCompletionMessage?: boolean; }): string { - if (params.expectsCompletionMessage) { - return `A completed ${params.announceType} is ready for user delivery. Convert the result above into your normal assistant voice and send that user-facing update now. Keep this internal context private (don't mention system/log/stats/session details or announce type).`; - } if (params.remainingActiveSubagentRuns > 0) { const activeRunsLabel = params.remainingActiveSubagentRuns === 1 ? "run" : "runs"; return `There are still ${params.remainingActiveSubagentRuns} active subagent ${activeRunsLabel} for this session. If they are part of the same workflow, wait for the remaining results before sending a user update. If they are unrelated, respond normally using only the result above.`; @@ -723,10 +442,8 @@ export async function runSubagentAnnounceFlow(params: { label?: string; outcome?: SubagentRunOutcome; announceType?: SubagentAnnounceType; - expectsCompletionMessage?: boolean; }): Promise { let didAnnounce = false; - const expectsCompletionMessage = params.expectsCompletionMessage === true; let shouldDeleteChildSession = params.cleanup === "delete"; try { let targetRequesterSessionKey = params.requesterSessionKey; @@ -742,7 +459,7 @@ export async function runSubagentAnnounceFlow(params: { let outcome: SubagentRunOutcome | undefined = params.outcome; // Lifecycle "end" can arrive before auto-compaction retries finish. If the // subagent is still active, wait for the embedded run to fully settle. - if (!expectsCompletionMessage && childSessionId && isEmbeddedPiRunActive(childSessionId)) { + if (childSessionId && isEmbeddedPiRunActive(childSessionId)) { const settled = await waitForEmbeddedPiRunEnd(childSessionId, settleTimeoutMs); if (!settled && isEmbeddedPiRunActive(childSessionId)) { // The child run is still active (e.g., compaction retry still in progress). @@ -787,26 +504,22 @@ export async function runSubagentAnnounceFlow(params: { outcome = { status: "timeout" }; } } - reply = await readLatestSubagentOutput(params.childSessionKey); + reply = await readLatestAssistantReply({ sessionKey: params.childSessionKey }); } if (!reply) { - reply = await readLatestSubagentOutput(params.childSessionKey); + reply = await readLatestAssistantReply({ sessionKey: params.childSessionKey }); } if (!reply?.trim()) { - reply = await readLatestSubagentOutputWithRetry({ + reply = await readLatestAssistantReplyWithRetry({ sessionKey: params.childSessionKey, + initialReply: reply, maxWaitMs: params.timeoutMs, }); } - if ( - !expectsCompletionMessage && - !reply?.trim() && - childSessionId && - isEmbeddedPiRunActive(childSessionId) - ) { + if (!reply?.trim() && childSessionId && isEmbeddedPiRunActive(childSessionId)) { // Avoid announcing "(no output)" while the child run is still producing output. shouldDeleteChildSession = false; return false; @@ -823,12 +536,46 @@ export async function runSubagentAnnounceFlow(params: { } catch { // Best-effort only; fall back to direct announce behavior when unavailable. } - if (!expectsCompletionMessage && activeChildDescendantRuns > 0) { + if (activeChildDescendantRuns > 0) { // The finished run still has active descendant subagents. Defer announcing // this run until descendants settle so we avoid posting in-progress updates. shouldDeleteChildSession = false; return false; } + // If the subagent reply is still a "waiting for nested result" placeholder, + // hold this announce and wait for the follow-up turn that synthesizes child output. + let hasAnyChildDescendantRuns = false; + try { + const { listDescendantRunsForRequester } = await import("./subagent-registry.js"); + hasAnyChildDescendantRuns = listDescendantRunsForRequester(params.childSessionKey).length > 0; + } catch { + // Best-effort only; fall back to existing behavior when unavailable. + } + if (hasAnyChildDescendantRuns && isLikelyWaitingForDescendantResult(reply)) { + const followupReply = await waitForAssistantReplyChange({ + sessionKey: params.childSessionKey, + previousReply: reply, + maxWaitMs: settleTimeoutMs, + }); + if (!followupReply?.trim()) { + shouldDeleteChildSession = false; + return false; + } + reply = followupReply; + try { + const { countActiveDescendantRuns } = await import("./subagent-registry.js"); + activeChildDescendantRuns = Math.max(0, countActiveDescendantRuns(params.childSessionKey)); + } catch { + activeChildDescendantRuns = 0; + } + if ( + activeChildDescendantRuns > 0 || + (hasAnyChildDescendantRuns && isLikelyWaitingForDescendantResult(reply)) + ) { + shouldDeleteChildSession = false; + return false; + } + } // Build status label const statusLabel = @@ -843,14 +590,12 @@ export async function runSubagentAnnounceFlow(params: { // Build instructional message for main agent const announceType = params.announceType ?? "subagent task"; const taskLabel = params.label || params.task || "task"; - const subagentName = resolveAgentIdFromSessionKey(params.childSessionKey); const announceSessionId = childSessionId || "unknown"; const findings = reply || "(no output)"; - let completionMessage = ""; let triggerMessage = ""; let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey); - let requesterIsSubagent = !expectsCompletionMessage && requesterDepth >= 1; + let requesterIsSubagent = requesterDepth >= 1; // If the requester subagent has already finished, bubble the announce to its // requester (typically main) so descendant completion is not silently lost. // BUT: only fallback if the parent SESSION is deleted, not just if the current @@ -903,31 +648,43 @@ export async function runSubagentAnnounceFlow(params: { remainingActiveSubagentRuns, requesterIsSubagent, announceType, - expectsCompletionMessage, }); const statsLine = await buildCompactAnnounceStatsLine({ sessionKey: params.childSessionKey, startedAt: params.startedAt, endedAt: params.endedAt, }); - completionMessage = buildCompletionDeliveryMessage({ - findings, - subagentName, - }); - const internalSummaryMessage = [ + triggerMessage = [ `[System Message] [sessionId: ${announceSessionId}] A ${announceType} "${taskLabel}" just ${statusLabel}.`, "", "Result:", findings, "", statsLine, + "", + replyInstruction, ].join("\n"); - triggerMessage = [internalSummaryMessage, "", replyInstruction].join("\n"); const announceId = buildAnnounceIdFromChildRun({ childSessionKey: params.childSessionKey, childRunId: params.childRunId, }); + const queued = await maybeQueueSubagentAnnounce({ + requesterSessionKey: targetRequesterSessionKey, + announceId, + triggerMessage, + summaryLine: taskLabel, + requesterOrigin: targetRequesterOrigin, + }); + if (queued === "steered") { + didAnnounce = true; + return true; + } + if (queued === "queued") { + didAnnounce = true; + return true; + } + // Send to the requester session. For nested subagents this is an internal // follow-up injection (deliver=false) so the orchestrator receives it. let directOrigin = targetRequesterOrigin; @@ -939,26 +696,26 @@ export async function runSubagentAnnounceFlow(params: { // catches duplicates if this announce is also queued by the gateway- // level message queue while the main session is busy (#17122). const directIdempotencyKey = buildAnnounceIdempotencyKey(announceId); - const delivery = await deliverSubagentAnnouncement({ - requesterSessionKey: targetRequesterSessionKey, - announceId, - triggerMessage, - completionMessage, - summaryLine: taskLabel, - requesterOrigin: targetRequesterOrigin, - completionDirectOrigin: targetRequesterOrigin, - directOrigin, - targetRequesterSessionKey, - requesterIsSubagent, - expectsCompletionMessage: expectsCompletionMessage, - directIdempotencyKey, + await callGateway({ + method: "agent", + params: { + sessionKey: targetRequesterSessionKey, + message: triggerMessage, + deliver: !requesterIsSubagent, + channel: requesterIsSubagent ? undefined : directOrigin?.channel, + accountId: requesterIsSubagent ? undefined : directOrigin?.accountId, + to: requesterIsSubagent ? undefined : directOrigin?.to, + threadId: + !requesterIsSubagent && directOrigin?.threadId != null && directOrigin.threadId !== "" + ? String(directOrigin.threadId) + : undefined, + idempotencyKey: directIdempotencyKey, + }, + expectFinal: true, + timeoutMs: 15_000, }); - didAnnounce = delivery.delivered; - if (!delivery.delivered && delivery.path === "direct" && delivery.error) { - defaultRuntime.error?.( - `Subagent completion direct announce failed for run ${params.childRunId}: ${delivery.error}`, - ); - } + + didAnnounce = true; } catch (err) { defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`); // Best-effort follow-ups; ignore failures to avoid breaking the caller response. diff --git a/src/agents/subagent-registry.announce-loop-guard.test.ts b/src/agents/subagent-registry.announce-loop-guard.test.ts index 9c2545228..7064b25b9 100644 --- a/src/agents/subagent-registry.announce-loop-guard.test.ts +++ b/src/agents/subagent-registry.announce-loop-guard.test.ts @@ -151,4 +151,57 @@ describe("announce loop guard (#18264)", () => { const stored = runs.find((run) => run.runId === entry.runId); expect(stored?.cleanupCompletedAt).toBeDefined(); }); + + test("does not consume retry budget while descendants are still active", async () => { + announceFn.mockClear(); + registry.resetSubagentRegistryForTests(); + + const now = Date.now(); + const parentEntry = { + runId: "test-parent-ended", + childSessionKey: "agent:main:subagent:parent-ended", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "agent:main:main", + task: "parent task", + cleanup: "keep" as const, + createdAt: now - 30_000, + startedAt: now - 20_000, + endedAt: now - 10_000, + expectsCompletionMessage: true, + cleanupHandled: false, + }; + const activeDescendant = { + runId: "test-desc-active", + childSessionKey: "agent:main:subagent:parent-ended:subagent:leaf", + requesterSessionKey: "agent:main:subagent:parent-ended", + requesterDisplayKey: "agent:main:subagent:parent-ended", + task: "leaf task", + cleanup: "keep" as const, + createdAt: now - 5_000, + startedAt: now - 5_000, + expectsCompletionMessage: true, + cleanupHandled: false, + }; + + loadSubagentRegistryFromDisk.mockReturnValue( + new Map([ + [parentEntry.runId, parentEntry], + [activeDescendant.runId, activeDescendant], + ]), + ); + + registry.initSubagentRegistry(); + await Promise.resolve(); + await Promise.resolve(); + + expect(announceFn).toHaveBeenCalledWith( + expect.objectContaining({ childRunId: parentEntry.runId }), + ); + const parent = registry + .listSubagentRunsForRequester("agent:main:main") + .find((run) => run.runId === parentEntry.runId); + expect(parent?.announceRetryCount).toBeUndefined(); + expect(parent?.cleanupCompletedAt).toBeUndefined(); + expect(parent?.cleanupHandled).toBe(false); + }); }); diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 0e14a2aaa..479a176fb 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -102,7 +102,6 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor requesterOrigin, requesterDisplayKey: entry.requesterDisplayKey, task: entry.task, - expectsCompletionMessage: entry.expectsCompletionMessage, timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS, cleanup: entry.cleanup, waitForCompletion: false, @@ -324,12 +323,34 @@ function finalizeSubagentCleanup(runId: string, cleanup: "delete" | "keep", didA } if (!didAnnounce) { const now = Date.now(); + const endedAgo = typeof entry.endedAt === "number" ? now - entry.endedAt : 0; + // Normal defer: the run ended, but descendant runs are still active. + // Don't consume retry budget in this state or we can give up before + // descendants finish and before the parent synthesizes the final reply. + const activeDescendantRuns = Math.max(0, countActiveDescendantRuns(entry.childSessionKey)); + if (entry.expectsCompletionMessage === true && activeDescendantRuns > 0) { + if (endedAgo > ANNOUNCE_EXPIRY_MS) { + logAnnounceGiveUp(entry, "expiry"); + entry.cleanupCompletedAt = now; + persistSubagentRuns(); + retryDeferredCompletedAnnounces(runId); + return; + } + entry.lastAnnounceRetryAt = now; + entry.cleanupHandled = false; + resumedRuns.delete(runId); + persistSubagentRuns(); + setTimeout(() => { + resumeSubagentRun(runId); + }, MIN_ANNOUNCE_RETRY_DELAY_MS).unref?.(); + return; + } + const retryCount = (entry.announceRetryCount ?? 0) + 1; entry.announceRetryCount = retryCount; entry.lastAnnounceRetryAt = now; // Check if the announce has exceeded retry limits or expired (#18264). - const endedAgo = typeof entry.endedAt === "number" ? now - entry.endedAt : 0; if (retryCount >= MAX_ANNOUNCE_RETRY_COUNT || endedAgo > ANNOUNCE_EXPIRY_MS) { // Give up: mark as completed to break the infinite retry loop. logAnnounceGiveUp(entry, retryCount >= MAX_ANNOUNCE_RETRY_COUNT ? "retry-limit" : "expiry"); diff --git a/src/agents/subagent-spawn.ts b/src/agents/subagent-spawn.ts index f14e9e50e..e65e53a79 100644 --- a/src/agents/subagent-spawn.ts +++ b/src/agents/subagent-spawn.ts @@ -1,5 +1,6 @@ import crypto from "node:crypto"; import { formatThinkingLevels, normalizeThinkLevel } from "../auto-reply/thinking.js"; +import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../config/agent-limits.js"; import { loadConfig } from "../config/config.js"; import { callGateway } from "../gateway/call.js"; import { normalizeAgentId, parseAgentSessionKey } from "../routing/session-key.js"; @@ -107,7 +108,8 @@ export async function spawnSubagentDirect( }); const callerDepth = getSubagentDepthFromSessionStore(requesterInternalKey, { cfg }); - const maxSpawnDepth = cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? 1; + const maxSpawnDepth = + cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH; if (callerDepth >= maxSpawnDepth) { return { status: "forbidden", diff --git a/src/agents/system-prompt.e2e.test.ts b/src/agents/system-prompt.e2e.test.ts index a03ac2833..630405313 100644 --- a/src/agents/system-prompt.e2e.test.ts +++ b/src/agents/system-prompt.e2e.test.ts @@ -575,14 +575,15 @@ describe("buildSubagentSystemPrompt", () => { expect(prompt).toContain("instead of full-file `cat`"); }); - it("defaults to depth 1 and maxSpawnDepth 1 when not provided", () => { + it("defaults to depth 1 and maxSpawnDepth 2 when not provided", () => { const prompt = buildSubagentSystemPrompt({ childSessionKey: "agent:main:subagent:abc", task: "basic task", }); - // Should not include spawning guidance (default maxSpawnDepth is 1, depth 1 is leaf) - expect(prompt).not.toContain("## Sub-Agent Spawning"); + // Default maxSpawnDepth is 2, so depth-1 subagents are orchestrators. + expect(prompt).toContain("## Sub-Agent Spawning"); + expect(prompt).toContain("You CAN spawn your own sub-agents"); expect(prompt).toContain("spawned by the main agent"); }); }); diff --git a/src/agents/tools/subagents-tool.ts b/src/agents/tools/subagents-tool.ts index bf88212d6..9b0b75ce8 100644 --- a/src/agents/tools/subagents-tool.ts +++ b/src/agents/tools/subagents-tool.ts @@ -7,6 +7,7 @@ import { sortSubagentRuns, type SubagentTargetResolution, } from "../../auto-reply/reply/subagents-utils.js"; +import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../../config/agent-limits.js"; import { loadConfig } from "../../config/config.js"; import type { SessionEntry } from "../../config/sessions.js"; import { loadSessionStore, resolveStorePath, updateSessionStore } from "../../config/sessions.js"; @@ -199,7 +200,8 @@ function resolveRequesterKey(params: { // Check if this sub-agent can spawn children (orchestrator). // If so, it should see its own children, not its parent's children. const callerDepth = getSubagentDepthFromSessionStore(callerSessionKey, { cfg: params.cfg }); - const maxSpawnDepth = params.cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? 1; + const maxSpawnDepth = + params.cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH; if (callerDepth < maxSpawnDepth) { // Orchestrator sub-agent: use its own session key as requester // so it sees children it spawned. diff --git a/src/config/agent-limits.ts b/src/config/agent-limits.ts index 53df535eb..aa6119920 100644 --- a/src/config/agent-limits.ts +++ b/src/config/agent-limits.ts @@ -2,6 +2,7 @@ import type { OpenClawConfig } from "./types.js"; export const DEFAULT_AGENT_MAX_CONCURRENT = 4; export const DEFAULT_SUBAGENT_MAX_CONCURRENT = 8; +export const DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH = 2; export function resolveAgentMaxConcurrent(cfg?: OpenClawConfig): number { const raw = cfg?.agents?.defaults?.maxConcurrent; diff --git a/src/config/config.agent-concurrency-defaults.test.ts b/src/config/config.agent-concurrency-defaults.test.ts index d2fc38539..acffd872a 100644 --- a/src/config/config.agent-concurrency-defaults.test.ts +++ b/src/config/config.agent-concurrency-defaults.test.ts @@ -3,6 +3,7 @@ import path from "node:path"; import { describe, expect, it } from "vitest"; import { DEFAULT_AGENT_MAX_CONCURRENT, + DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH, DEFAULT_SUBAGENT_MAX_CONCURRENT, resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent, @@ -60,6 +61,7 @@ describe("agent concurrency defaults", () => { expect(cfg.agents?.defaults?.maxConcurrent).toBe(DEFAULT_AGENT_MAX_CONCURRENT); expect(cfg.agents?.defaults?.subagents?.maxConcurrent).toBe(DEFAULT_SUBAGENT_MAX_CONCURRENT); + expect(cfg.agents?.defaults?.subagents?.maxSpawnDepth).toBe(DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH); }); }); }); diff --git a/src/config/config.identity-defaults.test.ts b/src/config/config.identity-defaults.test.ts index 6c3d15f9b..edefe2c87 100644 --- a/src/config/config.identity-defaults.test.ts +++ b/src/config/config.identity-defaults.test.ts @@ -1,7 +1,11 @@ import fs from "node:fs/promises"; import path from "node:path"; import { describe, expect, it } from "vitest"; -import { DEFAULT_AGENT_MAX_CONCURRENT, DEFAULT_SUBAGENT_MAX_CONCURRENT } from "./agent-limits.js"; +import { + DEFAULT_AGENT_MAX_CONCURRENT, + DEFAULT_SUBAGENT_MAX_CONCURRENT, + DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH, +} from "./agent-limits.js"; import { loadConfig } from "./config.js"; import { withTempHome } from "./home-env.test-harness.js"; @@ -53,6 +57,7 @@ describe("config identity defaults", () => { expect(cfg.agents?.list).toBeUndefined(); expect(cfg.agents?.defaults?.maxConcurrent).toBe(DEFAULT_AGENT_MAX_CONCURRENT); expect(cfg.agents?.defaults?.subagents?.maxConcurrent).toBe(DEFAULT_SUBAGENT_MAX_CONCURRENT); + expect(cfg.agents?.defaults?.subagents?.maxSpawnDepth).toBe(DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH); expect(cfg.session).toBeUndefined(); }); }); diff --git a/src/config/defaults.ts b/src/config/defaults.ts index 09605388a..bebdeaf8c 100644 --- a/src/config/defaults.ts +++ b/src/config/defaults.ts @@ -1,6 +1,10 @@ import { DEFAULT_CONTEXT_TOKENS } from "../agents/defaults.js"; import { parseModelRef } from "../agents/model-selection.js"; -import { DEFAULT_AGENT_MAX_CONCURRENT, DEFAULT_SUBAGENT_MAX_CONCURRENT } from "./agent-limits.js"; +import { + DEFAULT_AGENT_MAX_CONCURRENT, + DEFAULT_SUBAGENT_MAX_CONCURRENT, + DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH, +} from "./agent-limits.js"; import { resolveTalkApiKey } from "./talk.js"; import type { OpenClawConfig } from "./types.js"; import type { ModelDefinitionConfig } from "./types.models.js"; @@ -299,7 +303,10 @@ export function applyAgentDefaults(cfg: OpenClawConfig): OpenClawConfig { const hasSubMax = typeof defaults?.subagents?.maxConcurrent === "number" && Number.isFinite(defaults.subagents.maxConcurrent); - if (hasMax && hasSubMax) { + const hasMaxSpawnDepth = + typeof defaults?.subagents?.maxSpawnDepth === "number" && + Number.isFinite(defaults.subagents.maxSpawnDepth); + if (hasMax && hasSubMax && hasMaxSpawnDepth) { return cfg; } @@ -315,6 +322,10 @@ export function applyAgentDefaults(cfg: OpenClawConfig): OpenClawConfig { nextSubagents.maxConcurrent = DEFAULT_SUBAGENT_MAX_CONCURRENT; mutated = true; } + if (!hasMaxSpawnDepth) { + nextSubagents.maxSpawnDepth = DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH; + mutated = true; + } if (!mutated) { return cfg; diff --git a/src/config/types.agent-defaults.ts b/src/config/types.agent-defaults.ts index aa3fbe419..75e715c73 100644 --- a/src/config/types.agent-defaults.ts +++ b/src/config/types.agent-defaults.ts @@ -241,7 +241,7 @@ export type AgentDefaultsConfig = { subagents?: { /** Max concurrent sub-agent runs (global lane: "subagent"). Default: 1. */ maxConcurrent?: number; - /** Maximum depth allowed for sessions_spawn chains. Default behavior: 1 (no nested spawns). */ + /** Maximum depth allowed for sessions_spawn chains. Default behavior: 2 (allows nested spawns). */ maxSpawnDepth?: number; /** Maximum active children a single requester session may spawn. Default behavior: 5. */ maxChildrenPerAgent?: number; diff --git a/src/config/zod-schema.agent-defaults.ts b/src/config/zod-schema.agent-defaults.ts index 4ec06f66b..201706ac9 100644 --- a/src/config/zod-schema.agent-defaults.ts +++ b/src/config/zod-schema.agent-defaults.ts @@ -150,7 +150,7 @@ export const AgentDefaultsSchema = z .max(5) .optional() .describe( - "Maximum nesting depth for sub-agent spawning. 1 = no nesting (default), 2 = sub-agents can spawn sub-sub-agents.", + "Maximum nesting depth for sub-agent spawning. Default is 2 (sub-agents can spawn sub-sub-agents).", ), maxChildrenPerAgent: z .number() diff --git a/src/cron/isolated-agent/session.test.ts b/src/cron/isolated-agent/session.test.ts index ead8313ee..75dec7080 100644 --- a/src/cron/isolated-agent/session.test.ts +++ b/src/cron/isolated-agent/session.test.ts @@ -4,11 +4,9 @@ import type { OpenClawConfig } from "../../config/config.js"; vi.mock("../../config/sessions.js", () => ({ loadSessionStore: vi.fn(), resolveStorePath: vi.fn().mockReturnValue("/tmp/test-store.json"), - evaluateSessionFreshness: vi.fn().mockReturnValue({ fresh: true }), - resolveSessionResetPolicy: vi.fn().mockReturnValue({ mode: "idle", idleMinutes: 60 }), })); -import { loadSessionStore, evaluateSessionFreshness } from "../../config/sessions.js"; +import { loadSessionStore } from "../../config/sessions.js"; import { resolveCronSession } from "./session.js"; const NOW_MS = 1_737_600_000_000; @@ -17,25 +15,18 @@ type SessionStore = ReturnType; type SessionStoreEntry = SessionStore[string]; type MockSessionStoreEntry = Partial; -function resolveWithStoredEntry(params?: { - sessionKey?: string; - entry?: MockSessionStoreEntry; - forceNew?: boolean; - fresh?: boolean; -}) { +function resolveWithStoredEntry(params?: { sessionKey?: string; entry?: MockSessionStoreEntry }) { const sessionKey = params?.sessionKey ?? "webhook:stable-key"; const store: SessionStore = params?.entry ? ({ [sessionKey]: params.entry as SessionStoreEntry } as SessionStore) : {}; vi.mocked(loadSessionStore).mockReturnValue(store); - vi.mocked(evaluateSessionFreshness).mockReturnValue({ fresh: params?.fresh ?? true }); return resolveCronSession({ cfg: {} as OpenClawConfig, sessionKey, agentId: "main", nowMs: NOW_MS, - forceNew: params?.forceNew, }); } @@ -85,76 +76,51 @@ describe("resolveCronSession", () => { expect(result.isNewSession).toBe(true); }); - // New tests for session reuse behavior (#18027) - describe("session reuse for webhooks/cron", () => { - it("reuses existing sessionId when session is fresh", () => { - const result = resolveWithStoredEntry({ - entry: { - sessionId: "existing-session-id-123", - updatedAt: NOW_MS - 1000, - systemSent: true, - }, - fresh: true, - }); - - expect(result.sessionEntry.sessionId).toBe("existing-session-id-123"); - expect(result.isNewSession).toBe(false); - expect(result.systemSent).toBe(true); + it("always creates a new sessionId for cron/webhook runs", () => { + const result = resolveWithStoredEntry({ + entry: { + sessionId: "existing-session-id-123", + updatedAt: NOW_MS - 1000, + systemSent: true, + }, }); - it("creates new sessionId when session is stale", () => { - const result = resolveWithStoredEntry({ - entry: { - sessionId: "old-session-id", - updatedAt: NOW_MS - 86_400_000, // 1 day ago - systemSent: true, - modelOverride: "gpt-4.1-mini", - providerOverride: "openai", - sendPolicy: "allow", - }, - fresh: false, - }); + expect(result.sessionEntry.sessionId).not.toBe("existing-session-id-123"); + expect(result.isNewSession).toBe(true); + expect(result.systemSent).toBe(false); + }); - expect(result.sessionEntry.sessionId).not.toBe("old-session-id"); - expect(result.isNewSession).toBe(true); - expect(result.systemSent).toBe(false); - expect(result.sessionEntry.modelOverride).toBe("gpt-4.1-mini"); - expect(result.sessionEntry.providerOverride).toBe("openai"); - expect(result.sessionEntry.sendPolicy).toBe("allow"); + it("preserves overrides while rolling a new sessionId", () => { + const result = resolveWithStoredEntry({ + entry: { + sessionId: "old-session-id", + updatedAt: NOW_MS - 86_400_000, + systemSent: true, + modelOverride: "gpt-4.1-mini", + providerOverride: "openai", + sendPolicy: "allow", + }, }); - it("creates new sessionId when forceNew is true", () => { - const result = resolveWithStoredEntry({ - entry: { - sessionId: "existing-session-id-456", - updatedAt: NOW_MS - 1000, - systemSent: true, - modelOverride: "sonnet-4", - providerOverride: "anthropic", - }, - fresh: true, - forceNew: true, - }); + expect(result.sessionEntry.sessionId).not.toBe("old-session-id"); + expect(result.isNewSession).toBe(true); + expect(result.systemSent).toBe(false); + expect(result.sessionEntry.modelOverride).toBe("gpt-4.1-mini"); + expect(result.sessionEntry.providerOverride).toBe("openai"); + expect(result.sessionEntry.sendPolicy).toBe("allow"); + }); - expect(result.sessionEntry.sessionId).not.toBe("existing-session-id-456"); - expect(result.isNewSession).toBe(true); - expect(result.systemSent).toBe(false); - expect(result.sessionEntry.modelOverride).toBe("sonnet-4"); - expect(result.sessionEntry.providerOverride).toBe("anthropic"); + it("creates new sessionId when entry exists but has no sessionId", () => { + const result = resolveWithStoredEntry({ + entry: { + updatedAt: NOW_MS - 1000, + modelOverride: "some-model", + }, }); - it("creates new sessionId when entry exists but has no sessionId", () => { - const result = resolveWithStoredEntry({ - entry: { - updatedAt: NOW_MS - 1000, - modelOverride: "some-model", - }, - }); - - expect(result.sessionEntry.sessionId).toBeDefined(); - expect(result.isNewSession).toBe(true); - // Should still preserve other fields from entry - expect(result.sessionEntry.modelOverride).toBe("some-model"); - }); + expect(result.sessionEntry.sessionId).toBeDefined(); + expect(result.isNewSession).toBe(true); + // Should still preserve other fields from entry + expect(result.sessionEntry.modelOverride).toBe("some-model"); }); }); diff --git a/src/cron/isolated-agent/session.ts b/src/cron/isolated-agent/session.ts index 0f23c836c..f9fec3ce8 100644 --- a/src/cron/isolated-agent/session.ts +++ b/src/cron/isolated-agent/session.ts @@ -1,19 +1,12 @@ import crypto from "node:crypto"; import type { OpenClawConfig } from "../../config/config.js"; -import { - evaluateSessionFreshness, - loadSessionStore, - resolveSessionResetPolicy, - resolveStorePath, - type SessionEntry, -} from "../../config/sessions.js"; +import { loadSessionStore, resolveStorePath, type SessionEntry } from "../../config/sessions.js"; export function resolveCronSession(params: { cfg: OpenClawConfig; sessionKey: string; nowMs: number; agentId: string; - forceNew?: boolean; }) { const sessionCfg = params.cfg.session; const storePath = resolveStorePath(sessionCfg?.store, { @@ -21,42 +14,8 @@ export function resolveCronSession(params: { }); const store = loadSessionStore(storePath); const entry = store[params.sessionKey]; - - // Check if we can reuse an existing session - let sessionId: string; - let isNewSession: boolean; - let systemSent: boolean; - - if (!params.forceNew && entry?.sessionId) { - // Evaluate freshness using the configured reset policy - // Cron/webhook sessions use "direct" reset type (1:1 conversation style) - const resetPolicy = resolveSessionResetPolicy({ - sessionCfg, - resetType: "direct", - }); - const freshness = evaluateSessionFreshness({ - updatedAt: entry.updatedAt, - now: params.nowMs, - policy: resetPolicy, - }); - - if (freshness.fresh) { - // Reuse existing session - sessionId = entry.sessionId; - isNewSession = false; - systemSent = entry.systemSent ?? false; - } else { - // Session expired, create new - sessionId = crypto.randomUUID(); - isNewSession = true; - systemSent = false; - } - } else { - // No existing session or forced new - sessionId = crypto.randomUUID(); - isNewSession = true; - systemSent = false; - } + const sessionId = crypto.randomUUID(); + const systemSent = false; const sessionEntry: SessionEntry = { // Preserve existing per-session overrides even when rolling to a new sessionId. @@ -66,5 +25,5 @@ export function resolveCronSession(params: { updatedAt: params.nowMs, systemSent, }; - return { storePath, store, sessionEntry, systemSent, isNewSession }; + return { storePath, store, sessionEntry, systemSent, isNewSession: true }; }