From 6a8081a7f33ea7ee32a957908c7ad2fd0136c09e Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 8 Mar 2026 02:02:00 +0000 Subject: [PATCH] refactor(routing): centralize inbound last-route policy --- src/acp/persistent-bindings.route.ts | 5 +++ src/discord/monitor/route-resolution.test.ts | 3 ++ src/discord/monitor/route-resolution.ts | 5 +++ src/routing/resolve-route.test.ts | 45 ++++++++++++++++++- src/routing/resolve-route.ts | 17 +++++++ ...t-message-context.named-account-dm.test.ts | 34 +++++++++++++- src/telegram/bot-message-context.ts | 22 +++++++-- src/telegram/conversation-route.ts | 18 ++++++++ src/web/auto-reply/monitor/broadcast.ts | 19 +++++++- src/web/auto-reply/monitor/process-message.ts | 13 ++++-- 10 files changed, 172 insertions(+), 9 deletions(-) diff --git a/src/acp/persistent-bindings.route.ts b/src/acp/persistent-bindings.route.ts index 9436d930d..d11d46d42 100644 --- a/src/acp/persistent-bindings.route.ts +++ b/src/acp/persistent-bindings.route.ts @@ -1,5 +1,6 @@ import type { OpenClawConfig } from "../config/config.js"; import type { ResolvedAgentRoute } from "../routing/resolve-route.js"; +import { deriveLastRoutePolicy } from "../routing/resolve-route.js"; import { resolveAgentIdFromSessionKey } from "../routing/session-key.js"; import { ensureConfiguredAcpBindingSession, @@ -50,6 +51,10 @@ export function resolveConfiguredAcpRoute(params: { ...params.route, sessionKey: boundSessionKey, agentId: boundAgentId, + lastRoutePolicy: deriveLastRoutePolicy({ + sessionKey: boundSessionKey, + mainSessionKey: params.route.mainSessionKey, + }), matchedBy: "binding.channel", }, }; diff --git a/src/discord/monitor/route-resolution.test.ts b/src/discord/monitor/route-resolution.test.ts index f64023cd0..d9ec90177 100644 --- a/src/discord/monitor/route-resolution.test.ts +++ b/src/discord/monitor/route-resolution.test.ts @@ -30,6 +30,7 @@ describe("discord route resolution helpers", () => { accountId: "default", sessionKey: "agent:main:discord:channel:c1", mainSessionKey: "agent:main:main", + lastRoutePolicy: "session", matchedBy: "default", }; @@ -54,6 +55,7 @@ describe("discord route resolution helpers", () => { accountId: "default", sessionKey: "agent:main:discord:channel:c1", mainSessionKey: "agent:main:main", + lastRoutePolicy: "session", matchedBy: "default", }; const configuredRoute = { @@ -62,6 +64,7 @@ describe("discord route resolution helpers", () => { agentId: "worker", sessionKey: "agent:worker:discord:channel:c1", mainSessionKey: "agent:worker:main", + lastRoutePolicy: "session" as const, matchedBy: "binding.peer" as const, }, }; diff --git a/src/discord/monitor/route-resolution.ts b/src/discord/monitor/route-resolution.ts index b0284ff77..2e65ff639 100644 --- a/src/discord/monitor/route-resolution.ts +++ b/src/discord/monitor/route-resolution.ts @@ -1,5 +1,6 @@ import type { OpenClawConfig } from "../../config/config.js"; import { + deriveLastRoutePolicy, resolveAgentRoute, type ResolvedAgentRoute, type RoutePeer, @@ -90,6 +91,10 @@ export function resolveDiscordEffectiveRoute(params: { ...params.route, sessionKey: boundSessionKey, agentId: resolveAgentIdFromSessionKey(boundSessionKey), + lastRoutePolicy: deriveLastRoutePolicy({ + sessionKey: boundSessionKey, + mainSessionKey: params.route.mainSessionKey, + }), ...(params.matchedBy ? { matchedBy: params.matchedBy } : {}), }; } diff --git a/src/routing/resolve-route.test.ts b/src/routing/resolve-route.test.ts index 00bc55c35..3e2c9c4d5 100644 --- a/src/routing/resolve-route.test.ts +++ b/src/routing/resolve-route.test.ts @@ -2,7 +2,11 @@ import { describe, expect, test, vi } from "vitest"; import type { ChatType } from "../channels/chat-type.js"; import type { OpenClawConfig } from "../config/config.js"; import * as routingBindings from "./bindings.js"; -import { resolveAgentRoute } from "./resolve-route.js"; +import { + deriveLastRoutePolicy, + resolveAgentRoute, + resolveInboundLastRouteSessionKey, +} from "./resolve-route.js"; describe("resolveAgentRoute", () => { const resolveDiscordGuildRoute = (cfg: OpenClawConfig) => @@ -25,6 +29,7 @@ describe("resolveAgentRoute", () => { expect(route.agentId).toBe("main"); expect(route.accountId).toBe("default"); expect(route.sessionKey).toBe("agent:main:main"); + expect(route.lastRoutePolicy).toBe("main"); expect(route.matchedBy).toBe("default"); }); @@ -47,9 +52,47 @@ describe("resolveAgentRoute", () => { peer: { kind: "direct", id: "+15551234567" }, }); expect(route.sessionKey).toBe(testCase.expected); + expect(route.lastRoutePolicy).toBe("session"); } }); + test("resolveInboundLastRouteSessionKey follows route policy", () => { + expect( + resolveInboundLastRouteSessionKey({ + route: { + mainSessionKey: "agent:main:main", + lastRoutePolicy: "main", + }, + sessionKey: "agent:main:discord:direct:user-1", + }), + ).toBe("agent:main:main"); + + expect( + resolveInboundLastRouteSessionKey({ + route: { + mainSessionKey: "agent:main:main", + lastRoutePolicy: "session", + }, + sessionKey: "agent:main:telegram:atlas:direct:123", + }), + ).toBe("agent:main:telegram:atlas:direct:123"); + }); + + test("deriveLastRoutePolicy collapses only main-session routes", () => { + expect( + deriveLastRoutePolicy({ + sessionKey: "agent:main:main", + mainSessionKey: "agent:main:main", + }), + ).toBe("main"); + expect( + deriveLastRoutePolicy({ + sessionKey: "agent:main:telegram:direct:123", + mainSessionKey: "agent:main:main", + }), + ).toBe("session"); + }); + test("identityLinks applies to direct-message scopes", () => { const cases = [ { diff --git a/src/routing/resolve-route.ts b/src/routing/resolve-route.ts index 29a7d9c11..f56fdc131 100644 --- a/src/routing/resolve-route.ts +++ b/src/routing/resolve-route.ts @@ -44,6 +44,8 @@ export type ResolvedAgentRoute = { sessionKey: string; /** Convenience alias for direct-chat collapse. */ mainSessionKey: string; + /** Which session should receive inbound last-route updates. */ + lastRoutePolicy: "main" | "session"; /** Match description for debugging/logging. */ matchedBy: | "binding.peer" @@ -58,6 +60,20 @@ export type ResolvedAgentRoute = { export { DEFAULT_ACCOUNT_ID, DEFAULT_AGENT_ID } from "./session-key.js"; +export function deriveLastRoutePolicy(params: { + sessionKey: string; + mainSessionKey: string; +}): ResolvedAgentRoute["lastRoutePolicy"] { + return params.sessionKey === params.mainSessionKey ? "main" : "session"; +} + +export function resolveInboundLastRouteSessionKey(params: { + route: Pick; + sessionKey: string; +}): string { + return params.route.lastRoutePolicy === "main" ? params.route.mainSessionKey : params.sessionKey; +} + function normalizeToken(value: string | undefined | null): string { return (value ?? "").trim().toLowerCase(); } @@ -662,6 +678,7 @@ export function resolveAgentRoute(input: ResolveAgentRouteInput): ResolvedAgentR accountId, sessionKey, mainSessionKey, + lastRoutePolicy: deriveLastRoutePolicy({ sessionKey, mainSessionKey }), matchedBy, }; if (routeCache && routeCacheKey) { diff --git a/src/telegram/bot-message-context.named-account-dm.test.ts b/src/telegram/bot-message-context.named-account-dm.test.ts index e7ddf3455..c48fb17fe 100644 --- a/src/telegram/bot-message-context.named-account-dm.test.ts +++ b/src/telegram/bot-message-context.named-account-dm.test.ts @@ -1,7 +1,12 @@ -import { afterEach, describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { clearRuntimeConfigSnapshot, setRuntimeConfigSnapshot } from "../config/config.js"; import { buildTelegramMessageContextForTest } from "./bot-message-context.test-harness.js"; +const recordInboundSessionMock = vi.fn().mockResolvedValue(undefined); +vi.mock("../channels/session.js", () => ({ + recordInboundSession: (...args: unknown[]) => recordInboundSessionMock(...args), +})); + describe("buildTelegramMessageContext named-account DM fallback", () => { const baseCfg = { agents: { defaults: { model: "anthropic/claude-opus-4-5", workspace: "/tmp/openclaw" } }, @@ -11,8 +16,16 @@ describe("buildTelegramMessageContext named-account DM fallback", () => { afterEach(() => { clearRuntimeConfigSnapshot(); + recordInboundSessionMock.mockClear(); }); + function getLastUpdateLastRoute(): { sessionKey?: string } | undefined { + const callArgs = recordInboundSessionMock.mock.calls.at(-1)?.[0] as { + updateLastRoute?: { sessionKey?: string }; + }; + return callArgs?.updateLastRoute; + } + it("allows DM through for a named account with no explicit binding", async () => { setRuntimeConfigSnapshot(baseCfg); @@ -51,6 +64,25 @@ describe("buildTelegramMessageContext named-account DM fallback", () => { expect(ctx?.ctxPayload?.SessionKey).toBe("agent:main:telegram:atlas:direct:814912386"); }); + it("keeps named-account fallback lastRoute on the isolated DM session", async () => { + setRuntimeConfigSnapshot(baseCfg); + + const ctx = await buildTelegramMessageContextForTest({ + cfg: baseCfg, + accountId: "atlas", + message: { + message_id: 1, + chat: { id: 814912386, type: "private" }, + date: 1700000000, + text: "hello", + from: { id: 814912386, first_name: "Alice" }, + }, + }); + + expect(ctx?.ctxPayload?.SessionKey).toBe("agent:main:telegram:atlas:direct:814912386"); + expect(getLastUpdateLastRoute()?.sessionKey).toBe("agent:main:telegram:atlas:direct:814912386"); + }); + it("isolates sessions between named accounts that share the default agent", async () => { setRuntimeConfigSnapshot(baseCfg); diff --git a/src/telegram/bot-message-context.ts b/src/telegram/bot-message-context.ts index 88e530a98..674c01b89 100644 --- a/src/telegram/bot-message-context.ts +++ b/src/telegram/bot-message-context.ts @@ -39,7 +39,11 @@ import type { } from "../config/types.js"; import { logVerbose, shouldLogVerbose } from "../globals.js"; import { recordChannelActivity } from "../infra/channel-activity.js"; -import { buildAgentSessionKey } from "../routing/resolve-route.js"; +import { + buildAgentSessionKey, + deriveLastRoutePolicy, + resolveInboundLastRouteSessionKey, +} from "../routing/resolve-route.js"; import { DEFAULT_ACCOUNT_ID, resolveThreadSessionKeys } from "../routing/session-key.js"; import { resolvePinnedMainDmOwnerFromAllowlist } from "../security/dm-policy-shared.js"; import { withTelegramApiErrorLogging } from "./api-logging.js"; @@ -362,6 +366,14 @@ export const buildTelegramMessageContext = async ({ ? resolveThreadSessionKeys({ baseSessionKey, threadId: `${chatId}:${dmThreadId}` }) : null; const sessionKey = threadKeys?.sessionKey ?? baseSessionKey; + route = { + ...route, + sessionKey, + lastRoutePolicy: deriveLastRoutePolicy({ + sessionKey, + mainSessionKey: route.mainSessionKey, + }), + }; const mentionRegexes = buildMentionRegexes(cfg, route.agentId); // Compute requireMention after access checks and final route selection. const activationOverride = resolveGroupActivation({ @@ -832,6 +844,10 @@ export const buildTelegramMessageContext = async ({ normalizeEntry: (entry) => normalizeAllowFrom([entry]).entries[0], }) : null; + const updateLastRouteSessionKey = resolveInboundLastRouteSessionKey({ + route, + sessionKey, + }); await recordInboundSession({ storePath, @@ -839,14 +855,14 @@ export const buildTelegramMessageContext = async ({ ctx: ctxPayload, updateLastRoute: !isGroup ? { - sessionKey: route.mainSessionKey, + sessionKey: updateLastRouteSessionKey, channel: "telegram", to: `telegram:${chatId}`, accountId: route.accountId, // Preserve DM topic threadId for replies (fixes #8891) threadId: dmThreadId != null ? String(dmThreadId) : undefined, mainDmOwnerPin: - pinnedMainDmOwner && senderId + updateLastRouteSessionKey === route.mainSessionKey && pinnedMainDmOwner && senderId ? { ownerRecipient: pinnedMainDmOwner, senderRecipient: senderId, diff --git a/src/telegram/conversation-route.ts b/src/telegram/conversation-route.ts index 478e9049f..32088b818 100644 --- a/src/telegram/conversation-route.ts +++ b/src/telegram/conversation-route.ts @@ -4,6 +4,7 @@ import { logVerbose } from "../globals.js"; import { getSessionBindingService } from "../infra/outbound/session-binding-service.js"; import { buildAgentSessionKey, + deriveLastRoutePolicy, pickFirstExistingAgentId, resolveAgentRoute, } from "../routing/resolve-route.js"; @@ -67,6 +68,19 @@ export function resolveTelegramConversationRoute(params: { mainSessionKey: buildAgentMainSessionKey({ agentId: topicAgentId, }).toLowerCase(), + lastRoutePolicy: deriveLastRoutePolicy({ + sessionKey: buildAgentSessionKey({ + agentId: topicAgentId, + channel: "telegram", + accountId: params.accountId, + peer: { kind: params.isGroup ? "group" : "direct", id: peerId }, + dmScope: params.cfg.session?.dmScope, + identityLinks: params.cfg.session?.identityLinks, + }).toLowerCase(), + mainSessionKey: buildAgentMainSessionKey({ + agentId: topicAgentId, + }).toLowerCase(), + }), }; logVerbose( `telegram: topic route override: topic=${params.resolvedThreadId ?? params.replyThreadId} agent=${topicAgentId} sessionKey=${route.sessionKey}`, @@ -103,6 +117,10 @@ export function resolveTelegramConversationRoute(params: { ...route, sessionKey: boundSessionKey, agentId: resolveAgentIdFromSessionKey(boundSessionKey), + lastRoutePolicy: deriveLastRoutePolicy({ + sessionKey: boundSessionKey, + mainSessionKey: route.mainSessionKey, + }), matchedBy: "binding.channel", }; configuredBinding = null; diff --git a/src/web/auto-reply/monitor/broadcast.ts b/src/web/auto-reply/monitor/broadcast.ts index 88c0670fe..d220c9a82 100644 --- a/src/web/auto-reply/monitor/broadcast.ts +++ b/src/web/auto-reply/monitor/broadcast.ts @@ -1,6 +1,6 @@ import type { loadConfig } from "../../../config/config.js"; import type { resolveAgentRoute } from "../../../routing/resolve-route.js"; -import { buildAgentSessionKey } from "../../../routing/resolve-route.js"; +import { buildAgentSessionKey, deriveLastRoutePolicy } from "../../../routing/resolve-route.js"; import { buildAgentMainSessionKey, DEFAULT_MAIN_KEY, @@ -70,6 +70,23 @@ export async function maybeBroadcastMessage(params: { agentId: normalizedAgentId, mainKey: DEFAULT_MAIN_KEY, }), + lastRoutePolicy: deriveLastRoutePolicy({ + sessionKey: buildAgentSessionKey({ + agentId: normalizedAgentId, + channel: "whatsapp", + accountId: params.route.accountId, + peer: { + kind: params.msg.chatType === "group" ? "group" : "direct", + id: params.peerId, + }, + dmScope: params.cfg.session?.dmScope, + identityLinks: params.cfg.session?.identityLinks, + }), + mainSessionKey: buildAgentMainSessionKey({ + agentId: normalizedAgentId, + mainKey: DEFAULT_MAIN_KEY, + }), + }), }; try { diff --git a/src/web/auto-reply/monitor/process-message.ts b/src/web/auto-reply/monitor/process-message.ts index ff6d186da..b9e799377 100644 --- a/src/web/auto-reply/monitor/process-message.ts +++ b/src/web/auto-reply/monitor/process-message.ts @@ -19,7 +19,10 @@ import { recordSessionMetaFromInbound } from "../../../config/sessions.js"; import { logVerbose, shouldLogVerbose } from "../../../globals.js"; import type { getChildLogger } from "../../../logging.js"; import { getAgentScopedMediaLocalRoots } from "../../../media/local-roots.js"; -import type { resolveAgentRoute } from "../../../routing/resolve-route.js"; +import { + resolveInboundLastRouteSessionKey, + type resolveAgentRoute, +} from "../../../routing/resolve-route.js"; import { readStoreAllowFromForDmPolicy, resolvePinnedMainDmOwnerFromAllowlist, @@ -339,9 +342,13 @@ export async function processMessage(params: { }); const shouldUpdateMainLastRoute = !pinnedMainDmRecipient || pinnedMainDmRecipient === dmRouteTarget; + const inboundLastRouteSessionKey = resolveInboundLastRouteSessionKey({ + route: params.route, + sessionKey: params.route.sessionKey, + }); if ( dmRouteTarget && - params.route.sessionKey === params.route.mainSessionKey && + inboundLastRouteSessionKey === params.route.mainSessionKey && shouldUpdateMainLastRoute ) { updateLastRouteInBackground({ @@ -357,7 +364,7 @@ export async function processMessage(params: { }); } else if ( dmRouteTarget && - params.route.sessionKey === params.route.mainSessionKey && + inboundLastRouteSessionKey === params.route.mainSessionKey && pinnedMainDmRecipient ) { logVerbose(