diff --git a/CHANGELOG.md b/CHANGELOG.md index f42adebf8..3c5192d72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -239,6 +239,7 @@ Docs: https://docs.openclaw.ai - Slack/Identity: thread agent outbound identity (`chat:write.customize` overrides) through the channel reply delivery path so per-agent username, icon URL, and icon emoji are applied to all Slack replies including media messages. (#27134) Thanks @hou-rong. - Slack/Threading: resolve `replyToMode` per incoming message using chat-type-aware account config (`replyToModeByChatType` and legacy `dm.replyToMode`) so DM/channel reply threading honors overrides instead of always using monitor startup defaults. (#24717) Thanks @dbachelder. +- Slack/Threading: track bot participation in message threads (per account/channel/thread) so follow-up messages in those threads can be handled without requiring repeated @mentions, while preserving mention-gating behavior for unrelated threads. (#29165) Thanks @luijoc. - Agents/Subagents delivery: refactor subagent completion announce dispatch into an explicit queue/direct/fallback state machine, recover outbound channel-plugin resolution in cold/stale plugin-registry states across announce/message/gateway send paths, finalize cleanup bookkeeping when announce flow rejects, and treat Telegram sends without `message_id` as delivery failures (instead of false-success `"unknown"` IDs). (#26867, #25961, #26803, #25069, #26741) Thanks @SmithLabsLLC and @docaohieu2808. - Telegram/Webhook: pre-initialize webhook bots, switch webhook processing to callback-mode JSON handling, and preserve full near-limit payload reads under delayed handlers to prevent webhook request hangs and dropped updates. (#26156) - Slack/Session threads: prevent oversized parent-session inheritance from silently bricking new thread sessions, surface embedded context-overflow empty-result failures to users, and add configurable `session.parentForkMaxTokens` (default `100000`, `0` disables). (#26912) Thanks @markshields-tl. diff --git a/src/agents/tools/slack-actions.ts b/src/agents/tools/slack-actions.ts index 1350cb625..7eaa2dbfa 100644 --- a/src/agents/tools/slack-actions.ts +++ b/src/agents/tools/slack-actions.ts @@ -17,6 +17,7 @@ import { unpinSlackMessage, } from "../../slack/actions.js"; import { parseSlackBlocksInput } from "../../slack/blocks-input.js"; +import { recordSlackThreadParticipation } from "../../slack/sent-thread-cache.js"; import { parseSlackTarget, resolveSlackChannelId } from "../../slack/targets.js"; import { withNormalizedTimestamp } from "../date-time.js"; import { @@ -63,7 +64,9 @@ function resolveThreadTsFromContext( return undefined; } - const parsedTarget = parseSlackTarget(targetChannel, { defaultKind: "channel" }); + const parsedTarget = parseSlackTarget(targetChannel, { + defaultKind: "channel", + }); if (!parsedTarget || parsedTarget.kind !== "channel") { return undefined; } @@ -179,7 +182,9 @@ export async function handleSlackAction( switch (action) { case "sendMessage": { const to = readStringParam(params, "to", { required: true }); - const content = readStringParam(params, "content", { allowEmpty: true }); + const content = readStringParam(params, "content", { + allowEmpty: true, + }); const mediaUrl = readStringParam(params, "mediaUrl"); const blocks = readSlackBlocksParam(params); if (!content && !mediaUrl && !blocks) { @@ -200,6 +205,10 @@ export async function handleSlackAction( blocks, }); + if (threadTs && result.channelId && account.accountId) { + recordSlackThreadParticipation(account.accountId, result.channelId, threadTs); + } + // Keep "first" mode consistent even when the agent explicitly provided // threadTs: once we send a message to the current channel, consider the // first reply "used" so later tool calls don't auto-thread again. @@ -217,7 +226,9 @@ export async function handleSlackAction( const messageId = readStringParam(params, "messageId", { required: true, }); - const content = readStringParam(params, "content", { allowEmpty: true }); + const content = readStringParam(params, "content", { + allowEmpty: true, + }); const blocks = readSlackBlocksParam(params); if (!content && !blocks) { throw new Error("Slack editMessage requires content or blocks."); @@ -228,7 +239,9 @@ export async function handleSlackAction( blocks, }); } else { - await editSlackMessage(channelId, messageId, content ?? "", { blocks }); + await editSlackMessage(channelId, messageId, content ?? "", { + blocks, + }); } return jsonResult({ ok: true }); } @@ -336,7 +349,10 @@ export async function handleSlackAction( if (entries.length > limit) { return jsonResult({ ok: true, - emojis: { ...result, emoji: Object.fromEntries(entries.slice(0, limit)) }, + emojis: { + ...result, + emoji: Object.fromEntries(entries.slice(0, limit)), + }, }); } } diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index a7a852ea1..8e3db47d5 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -12,6 +12,7 @@ import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js"; import { resolveAgentOutboundIdentity } from "../../../infra/outbound/identity.js"; import { removeSlackReaction } from "../../actions.js"; import { createSlackDraftStream } from "../../draft-stream.js"; +import { recordSlackThreadParticipation } from "../../sent-thread-cache.js"; import { applyAppendOnlyStreamUpdate, buildStatusFinalPreviewText, @@ -189,6 +190,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag }); let streamSession: SlackStreamSession | null = null; let streamFailed = false; + let usedReplyThreadTs: string | undefined; const deliverNormally = async (payload: ReplyPayload, forcedThreadTs?: string): Promise => { const replyThreadTs = forcedThreadTs ?? replyPlan.nextThreadTs(); @@ -203,6 +205,10 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag replyToMode: prepared.replyToMode, ...(slackIdentity ? { identity: slackIdentity } : {}), }); + // Record the thread ts only after confirmed delivery success. + if (replyThreadTs) { + usedReplyThreadTs ??= replyThreadTs; + } replyPlan.markSent(); }; @@ -235,6 +241,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag teamId: ctx.teamId, userId: message.user, }); + usedReplyThreadTs ??= streamThreadTs; replyPlan.markSent(); return; } @@ -324,7 +331,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag token: ctx.botToken, accountId: account.accountId, maxChars: Math.min(ctx.textLimit, 4000), - resolveThreadTs: () => replyPlan.nextThreadTs(), + resolveThreadTs: () => { + const ts = replyPlan.nextThreadTs(); + if (ts) { + usedReplyThreadTs ??= ts; + } + return ts; + }, onMessageSent: () => replyPlan.markSent(), log: logVerbose, warn: logVerbose, @@ -425,6 +438,14 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag const anyReplyDelivered = queuedFinal || (counts.block ?? 0) > 0 || (counts.final ?? 0) > 0; + // Record thread participation only when we actually delivered a reply and + // know the thread ts that was used (set by deliverNormally, streaming start, + // or draft stream). Falls back to statusThreadTs for edge cases. + const participationThreadTs = usedReplyThreadTs ?? statusThreadTs; + if (anyReplyDelivered && participationThreadTs) { + recordSlackThreadParticipation(account.accountId, message.channel, participationThreadTs); + } + if (!anyReplyDelivered) { await draftStream.clear(); if (prepared.isRoomish) { diff --git a/src/slack/monitor/message-handler/prepare.ts b/src/slack/monitor/message-handler/prepare.ts index 9462ac767..875b23bd9 100644 --- a/src/slack/monitor/message-handler/prepare.ts +++ b/src/slack/monitor/message-handler/prepare.ts @@ -32,6 +32,7 @@ import { resolveThreadSessionKeys } from "../../../routing/session-key.js"; import { resolveSlackReplyToMode, type ResolvedSlackAccount } from "../../accounts.js"; import { reactSlackMessage } from "../../actions.js"; import { sendMessageSlack } from "../../send.js"; +import { hasSlackThreadParticipation } from "../../sent-thread-cache.js"; import { resolveSlackThreadContext } from "../../threading.js"; import type { SlackMessageEvent } from "../../types.js"; import { resolveSlackAllowListMatch, resolveSlackUserAllowed } from "../allow-list.js"; @@ -210,7 +211,8 @@ export async function prepareSlackMessage(params: { !isDirectMessage && ctx.botUserId && message.thread_ts && - message.parent_user_id === ctx.botUserId, + (message.parent_user_id === ctx.botUserId || + hasSlackThreadParticipation(account.accountId, message.channel, message.thread_ts)), ); const sender = message.user ? await ctx.resolveUserName(message.user) : null; @@ -259,7 +261,10 @@ export async function prepareSlackMessage(params: { useAccessGroups: ctx.useAccessGroups, authorizers: [ { configured: allowFromLower.length > 0, allowed: ownerAuthorized }, - { configured: channelUsersAllowlistConfigured, allowed: channelCommandAuthorized }, + { + configured: channelUsersAllowlistConfigured, + allowed: channelCommandAuthorized, + }, ], allowTextCommands, hasControlCommand: hasControlCommandInMessage, diff --git a/src/slack/sent-thread-cache.test.ts b/src/slack/sent-thread-cache.test.ts new file mode 100644 index 000000000..57acea9fa --- /dev/null +++ b/src/slack/sent-thread-cache.test.ts @@ -0,0 +1,58 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + clearSlackThreadParticipationCache, + hasSlackThreadParticipation, + recordSlackThreadParticipation, +} from "./sent-thread-cache.js"; + +describe("slack sent-thread-cache", () => { + afterEach(() => { + clearSlackThreadParticipationCache(); + vi.restoreAllMocks(); + }); + + it("records and checks thread participation", () => { + recordSlackThreadParticipation("A1", "C123", "1700000000.000001"); + expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000001")).toBe(true); + }); + + it("returns false for unrecorded threads", () => { + expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000001")).toBe(false); + }); + + it("distinguishes different channels and threads", () => { + recordSlackThreadParticipation("A1", "C123", "1700000000.000001"); + expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000002")).toBe(false); + expect(hasSlackThreadParticipation("A1", "C456", "1700000000.000001")).toBe(false); + }); + + it("scopes participation by accountId", () => { + recordSlackThreadParticipation("A1", "C123", "1700000000.000001"); + expect(hasSlackThreadParticipation("A2", "C123", "1700000000.000001")).toBe(false); + expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000001")).toBe(true); + }); + + it("ignores empty accountId, channelId, or threadTs", () => { + recordSlackThreadParticipation("", "C123", "1700000000.000001"); + recordSlackThreadParticipation("A1", "", "1700000000.000001"); + recordSlackThreadParticipation("A1", "C123", ""); + expect(hasSlackThreadParticipation("", "C123", "1700000000.000001")).toBe(false); + expect(hasSlackThreadParticipation("A1", "", "1700000000.000001")).toBe(false); + expect(hasSlackThreadParticipation("A1", "C123", "")).toBe(false); + }); + + it("clears all entries", () => { + recordSlackThreadParticipation("A1", "C123", "1700000000.000001"); + recordSlackThreadParticipation("A1", "C456", "1700000000.000002"); + clearSlackThreadParticipationCache(); + expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000001")).toBe(false); + expect(hasSlackThreadParticipation("A1", "C456", "1700000000.000002")).toBe(false); + }); + + it("expired entries return false and are cleaned up on read", () => { + recordSlackThreadParticipation("A1", "C123", "1700000000.000001"); + // Advance time past the 24-hour TTL + vi.spyOn(Date, "now").mockReturnValue(Date.now() + 25 * 60 * 60 * 1000); + expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000001")).toBe(false); + }); +}); diff --git a/src/slack/sent-thread-cache.ts b/src/slack/sent-thread-cache.ts new file mode 100644 index 000000000..9cce41b07 --- /dev/null +++ b/src/slack/sent-thread-cache.ts @@ -0,0 +1,61 @@ +/** + * In-memory cache of Slack threads the bot has participated in. + * Used to auto-respond in threads without requiring @mention after the first reply. + * Follows a similar TTL pattern to the MS Teams and Telegram sent-message caches. + */ + +const TTL_MS = 24 * 60 * 60 * 1000; // 24 hours +const MAX_ENTRIES = 5000; + +const threadParticipation = new Map(); + +function makeKey(accountId: string, channelId: string, threadTs: string): string { + return `${accountId}:${channelId}:${threadTs}`; +} + +function evictExpired(): void { + const now = Date.now(); + for (const [key, timestamp] of threadParticipation) { + if (now - timestamp > TTL_MS) { + threadParticipation.delete(key); + } + } +} + +export function recordSlackThreadParticipation( + accountId: string, + channelId: string, + threadTs: string, +): void { + if (!accountId || !channelId || !threadTs) { + return; + } + if (threadParticipation.size >= MAX_ENTRIES) { + evictExpired(); + } + threadParticipation.set(makeKey(accountId, channelId, threadTs), Date.now()); +} + +export function hasSlackThreadParticipation( + accountId: string, + channelId: string, + threadTs: string, +): boolean { + if (!accountId || !channelId || !threadTs) { + return false; + } + const key = makeKey(accountId, channelId, threadTs); + const timestamp = threadParticipation.get(key); + if (timestamp == null) { + return false; + } + if (Date.now() - timestamp > TTL_MS) { + threadParticipation.delete(key); + return false; + } + return true; +} + +export function clearSlackThreadParticipationCache(): void { + threadParticipation.clear(); +}