diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn-captures-threadid.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn-captures-threadid.test.ts new file mode 100644 index 000000000..39d44ed7e --- /dev/null +++ b/src/agents/openclaw-tools.subagents.sessions-spawn-captures-threadid.test.ts @@ -0,0 +1,99 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const callGatewayMock = vi.fn(); +vi.mock("../gateway/call.js", () => ({ + callGateway: (opts: unknown) => callGatewayMock(opts), +})); + +let configOverride: ReturnType<(typeof import("../config/config.js"))["loadConfig"]> = { + session: { + mainKey: "main", + scope: "per-sender", + }, +}; + +vi.mock("../config/config.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + loadConfig: () => configOverride, + resolveGatewayPort: () => 18789, + }; +}); + +import "./test-helpers/fast-core-tools.js"; +import { createOpenClawTools } from "./openclaw-tools.js"; +import { + listSubagentRunsForRequester, + resetSubagentRegistryForTests, +} from "./subagent-registry.js"; + +describe("sessions_spawn requesterOrigin threading", () => { + beforeEach(() => { + resetSubagentRegistryForTests(); + callGatewayMock.mockReset(); + configOverride = { + session: { + mainKey: "main", + scope: "per-sender", + }, + }; + + callGatewayMock.mockImplementation(async (opts: unknown) => { + const req = opts as { method?: string }; + if (req.method === "agent") { + return { runId: "run-1", status: "accepted", acceptedAt: 1 }; + } + // Prevent background announce flow by returning a non-terminal status. + if (req.method === "agent.wait") { + return { runId: "run-1", status: "running" }; + } + return {}; + }); + }); + + it("captures threadId in requesterOrigin", async () => { + const tool = createOpenClawTools({ + agentSessionKey: "main", + agentChannel: "telegram", + agentTo: "telegram:123", + agentThreadId: 42, + }).find((candidate) => candidate.name === "sessions_spawn"); + if (!tool) { + throw new Error("missing sessions_spawn tool"); + } + + await tool.execute("call", { + task: "do thing", + runTimeoutSeconds: 1, + }); + + const runs = listSubagentRunsForRequester("main"); + expect(runs).toHaveLength(1); + expect(runs[0]?.requesterOrigin).toMatchObject({ + channel: "telegram", + to: "telegram:123", + threadId: 42, + }); + }); + + it("stores requesterOrigin without threadId when none is provided", async () => { + const tool = createOpenClawTools({ + agentSessionKey: "main", + agentChannel: "telegram", + agentTo: "telegram:123", + }).find((candidate) => candidate.name === "sessions_spawn"); + if (!tool) { + throw new Error("missing sessions_spawn tool"); + } + + await tool.execute("call", { + task: "do thing", + runTimeoutSeconds: 1, + }); + + const runs = listSubagentRunsForRequester("main"); + expect(runs).toHaveLength(1); + expect(runs[0]?.requesterOrigin?.threadId).toBeUndefined(); + }); +}); diff --git a/src/agents/subagent-announce.format.test.ts b/src/agents/subagent-announce.format.test.ts index a75e03df6..e00aae60a 100644 --- a/src/agents/subagent-announce.format.test.ts +++ b/src/agents/subagent-announce.format.test.ts @@ -198,6 +198,85 @@ describe("subagent announce formatting", () => { expect(call?.params?.accountId).toBe("kev"); }); + it("includes threadId when origin has an active topic/thread", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); + sessionStore = { + "agent:main:main": { + sessionId: "session-thread", + lastChannel: "telegram", + lastTo: "telegram:123", + lastThreadId: 42, + queueMode: "collect", + queueDebounceMs: 0, + }, + }; + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-thread", + requesterSessionKey: "main", + requesterDisplayKey: "main", + task: "do thing", + timeoutMs: 1000, + cleanup: "keep", + waitForCompletion: false, + startedAt: 10, + endedAt: 20, + outcome: { status: "ok" }, + }); + + expect(didAnnounce).toBe(true); + await expect.poll(() => agentSpy.mock.calls.length).toBe(1); + + const call = agentSpy.mock.calls[0]?.[0] as { params?: Record }; + expect(call?.params?.channel).toBe("telegram"); + expect(call?.params?.to).toBe("telegram:123"); + expect(call?.params?.threadId).toBe("42"); + }); + + it("prefers requesterOrigin.threadId over session entry threadId", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); + sessionStore = { + "agent:main:main": { + sessionId: "session-thread-override", + lastChannel: "telegram", + lastTo: "telegram:123", + lastThreadId: 42, + queueMode: "collect", + queueDebounceMs: 0, + }, + }; + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-thread-override", + requesterSessionKey: "main", + requesterDisplayKey: "main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + threadId: 99, + }, + task: "do thing", + timeoutMs: 1000, + cleanup: "keep", + waitForCompletion: false, + startedAt: 10, + endedAt: 20, + outcome: { status: "ok" }, + }); + + expect(didAnnounce).toBe(true); + await expect.poll(() => agentSpy.mock.calls.length).toBe(1); + + const call = agentSpy.mock.calls[0]?.[0] as { params?: Record }; + expect(call?.params?.threadId).toBe("99"); + }); + it("splits collect-mode queues when accountId differs", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); diff --git a/src/infra/outbound/message-action-runner.threading.test.ts b/src/infra/outbound/message-action-runner.threading.test.ts index b467823dd..4ff3ac3f7 100644 --- a/src/infra/outbound/message-action-runner.threading.test.ts +++ b/src/infra/outbound/message-action-runner.threading.test.ts @@ -1,6 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; import { slackPlugin } from "../../../extensions/slack/src/channel.js"; +import { telegramPlugin } from "../../../extensions/telegram/src/channel.js"; import { setActivePluginRegistry } from "../../plugins/runtime.js"; import { createTestRegistry } from "../../test-utils/channel-plugins.js"; @@ -40,12 +41,22 @@ const slackConfig = { }, } as OpenClawConfig; -describe("runMessageAction Slack threading", () => { +const telegramConfig = { + channels: { + telegram: { + botToken: "telegram-test", + }, + }, +} as OpenClawConfig; + +describe("runMessageAction threading auto-injection", () => { beforeEach(async () => { const { createPluginRuntime } = await import("../../plugins/runtime/index.js"); const { setSlackRuntime } = await import("../../../extensions/slack/src/runtime.js"); + const { setTelegramRuntime } = await import("../../../extensions/telegram/src/runtime.js"); const runtime = createPluginRuntime(); setSlackRuntime(runtime); + setTelegramRuntime(runtime); setActivePluginRegistry( createTestRegistry([ { @@ -53,6 +64,11 @@ describe("runMessageAction Slack threading", () => { source: "test", plugin: slackPlugin, }, + { + pluginId: "telegram", + source: "test", + plugin: telegramPlugin, + }, ]), ); }); @@ -114,4 +130,55 @@ describe("runMessageAction Slack threading", () => { const call = mocks.executeSendAction.mock.calls[0]?.[0]; expect(call?.ctx?.mirror?.sessionKey).toBe("agent:main:slack:channel:c123:thread:333.444"); }); + + it("auto-injects telegram threadId from toolContext when omitted", async () => { + mocks.executeSendAction.mockResolvedValue({ + handledBy: "plugin", + payload: {}, + }); + + await runMessageAction({ + cfg: telegramConfig, + action: "send", + params: { + channel: "telegram", + target: "telegram:123", + message: "hi", + }, + toolContext: { + currentChannelId: "telegram:123", + currentThreadTs: "42", + }, + agentId: "main", + }); + + const call = mocks.executeSendAction.mock.calls[0]?.[0] as { ctx?: { params?: any } }; + expect(call?.ctx?.params?.threadId).toBe("42"); + }); + + it("uses explicit telegram threadId when provided", async () => { + mocks.executeSendAction.mockResolvedValue({ + handledBy: "plugin", + payload: {}, + }); + + await runMessageAction({ + cfg: telegramConfig, + action: "send", + params: { + channel: "telegram", + target: "telegram:123", + message: "hi", + threadId: "999", + }, + toolContext: { + currentChannelId: "telegram:123", + currentThreadTs: "42", + }, + agentId: "main", + }); + + const call = mocks.executeSendAction.mock.calls[0]?.[0] as { ctx?: { params?: any } }; + expect(call?.ctx?.params?.threadId).toBe("999"); + }); }); diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index f75f94246..c9487415c 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -20,7 +20,7 @@ import { parseReplyDirectives } from "../../auto-reply/reply/reply-directives.js import { dispatchChannelMessageAction } from "../../channels/plugins/message-actions.js"; import { extensionForMime } from "../../media/mime.js"; import { parseSlackTarget } from "../../slack/targets.js"; -import { parseTelegramTarget } from "../../telegram/targets.js"; +// parseTelegramTarget no longer used (telegram auto-threading uses string matching) import { isDeliverableMessageChannel, normalizeMessageChannel, @@ -259,13 +259,12 @@ function resolveTelegramAutoThreadId(params: { if (!context?.currentThreadTs || !context.currentChannelId) { return undefined; } - // Parse both targets to extract base chat IDs, ignoring topic suffixes and - // internal prefixes (e.g. "telegram:group:123:topic:456" → "123"). - // This mirrors Slack's parseSlackTarget approach — compare canonical chat IDs - // so auto-threading applies even when representations differ. - const parsedTo = parseTelegramTarget(params.to); - const parsedChannel = parseTelegramTarget(context.currentChannelId); - if (parsedTo.chatId.toLowerCase() !== parsedChannel.chatId.toLowerCase()) { + // Only apply when the target matches the originating chat. + // Note: Telegram topic routing is carried via threadId/message_thread_id; + // `currentChannelId` (and most agent targets) are typically the base chat id. + const normalizedTo = params.to.trim().toLowerCase(); + const normalizedChannel = context.currentChannelId.trim().toLowerCase(); + if (normalizedTo !== normalizedChannel) { return undefined; } return context.currentThreadTs;