feat(slack): track thread participation for auto-reply without @mention (#29165)

* feat(slack): track thread participation for auto-reply without @mention

* fix(slack): scope thread participation cache by accountId and capture actual reply thread ts

* fix(slack): capture reply thread ts from all delivery paths and only after success

* Slack: add changelog for thread participation cache behavior

---------

Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
Luis Conde
2026-03-01 12:42:12 -04:00
committed by GitHub
parent dfbdab5a29
commit bd78a74298
6 changed files with 170 additions and 8 deletions

View File

@@ -239,6 +239,7 @@ Docs: https://docs.openclaw.ai
- Slack/Identity: thread agent outbound identity (`chat:write.customize` overrides) through the channel reply delivery path so per-agent username, icon URL, and icon emoji are applied to all Slack replies including media messages. (#27134) Thanks @hou-rong.
- Slack/Threading: resolve `replyToMode` per incoming message using chat-type-aware account config (`replyToModeByChatType` and legacy `dm.replyToMode`) so DM/channel reply threading honors overrides instead of always using monitor startup defaults. (#24717) Thanks @dbachelder.
- Slack/Threading: track bot participation in message threads (per account/channel/thread) so follow-up messages in those threads can be handled without requiring repeated @mentions, while preserving mention-gating behavior for unrelated threads. (#29165) Thanks @luijoc.
- Agents/Subagents delivery: refactor subagent completion announce dispatch into an explicit queue/direct/fallback state machine, recover outbound channel-plugin resolution in cold/stale plugin-registry states across announce/message/gateway send paths, finalize cleanup bookkeeping when announce flow rejects, and treat Telegram sends without `message_id` as delivery failures (instead of false-success `"unknown"` IDs). (#26867, #25961, #26803, #25069, #26741) Thanks @SmithLabsLLC and @docaohieu2808.
- Telegram/Webhook: pre-initialize webhook bots, switch webhook processing to callback-mode JSON handling, and preserve full near-limit payload reads under delayed handlers to prevent webhook request hangs and dropped updates. (#26156)
- Slack/Session threads: prevent oversized parent-session inheritance from silently bricking new thread sessions, surface embedded context-overflow empty-result failures to users, and add configurable `session.parentForkMaxTokens` (default `100000`, `0` disables). (#26912) Thanks @markshields-tl.

View File

@@ -17,6 +17,7 @@ import {
unpinSlackMessage,
} from "../../slack/actions.js";
import { parseSlackBlocksInput } from "../../slack/blocks-input.js";
import { recordSlackThreadParticipation } from "../../slack/sent-thread-cache.js";
import { parseSlackTarget, resolveSlackChannelId } from "../../slack/targets.js";
import { withNormalizedTimestamp } from "../date-time.js";
import {
@@ -63,7 +64,9 @@ function resolveThreadTsFromContext(
return undefined;
}
const parsedTarget = parseSlackTarget(targetChannel, { defaultKind: "channel" });
const parsedTarget = parseSlackTarget(targetChannel, {
defaultKind: "channel",
});
if (!parsedTarget || parsedTarget.kind !== "channel") {
return undefined;
}
@@ -179,7 +182,9 @@ export async function handleSlackAction(
switch (action) {
case "sendMessage": {
const to = readStringParam(params, "to", { required: true });
const content = readStringParam(params, "content", { allowEmpty: true });
const content = readStringParam(params, "content", {
allowEmpty: true,
});
const mediaUrl = readStringParam(params, "mediaUrl");
const blocks = readSlackBlocksParam(params);
if (!content && !mediaUrl && !blocks) {
@@ -200,6 +205,10 @@ export async function handleSlackAction(
blocks,
});
if (threadTs && result.channelId && account.accountId) {
recordSlackThreadParticipation(account.accountId, result.channelId, threadTs);
}
// Keep "first" mode consistent even when the agent explicitly provided
// threadTs: once we send a message to the current channel, consider the
// first reply "used" so later tool calls don't auto-thread again.
@@ -217,7 +226,9 @@ export async function handleSlackAction(
const messageId = readStringParam(params, "messageId", {
required: true,
});
const content = readStringParam(params, "content", { allowEmpty: true });
const content = readStringParam(params, "content", {
allowEmpty: true,
});
const blocks = readSlackBlocksParam(params);
if (!content && !blocks) {
throw new Error("Slack editMessage requires content or blocks.");
@@ -228,7 +239,9 @@ export async function handleSlackAction(
blocks,
});
} else {
await editSlackMessage(channelId, messageId, content ?? "", { blocks });
await editSlackMessage(channelId, messageId, content ?? "", {
blocks,
});
}
return jsonResult({ ok: true });
}
@@ -336,7 +349,10 @@ export async function handleSlackAction(
if (entries.length > limit) {
return jsonResult({
ok: true,
emojis: { ...result, emoji: Object.fromEntries(entries.slice(0, limit)) },
emojis: {
...result,
emoji: Object.fromEntries(entries.slice(0, limit)),
},
});
}
}

View File

@@ -12,6 +12,7 @@ import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js";
import { resolveAgentOutboundIdentity } from "../../../infra/outbound/identity.js";
import { removeSlackReaction } from "../../actions.js";
import { createSlackDraftStream } from "../../draft-stream.js";
import { recordSlackThreadParticipation } from "../../sent-thread-cache.js";
import {
applyAppendOnlyStreamUpdate,
buildStatusFinalPreviewText,
@@ -189,6 +190,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
});
let streamSession: SlackStreamSession | null = null;
let streamFailed = false;
let usedReplyThreadTs: string | undefined;
const deliverNormally = async (payload: ReplyPayload, forcedThreadTs?: string): Promise<void> => {
const replyThreadTs = forcedThreadTs ?? replyPlan.nextThreadTs();
@@ -203,6 +205,10 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
replyToMode: prepared.replyToMode,
...(slackIdentity ? { identity: slackIdentity } : {}),
});
// Record the thread ts only after confirmed delivery success.
if (replyThreadTs) {
usedReplyThreadTs ??= replyThreadTs;
}
replyPlan.markSent();
};
@@ -235,6 +241,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
teamId: ctx.teamId,
userId: message.user,
});
usedReplyThreadTs ??= streamThreadTs;
replyPlan.markSent();
return;
}
@@ -324,7 +331,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
token: ctx.botToken,
accountId: account.accountId,
maxChars: Math.min(ctx.textLimit, 4000),
resolveThreadTs: () => replyPlan.nextThreadTs(),
resolveThreadTs: () => {
const ts = replyPlan.nextThreadTs();
if (ts) {
usedReplyThreadTs ??= ts;
}
return ts;
},
onMessageSent: () => replyPlan.markSent(),
log: logVerbose,
warn: logVerbose,
@@ -425,6 +438,14 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
const anyReplyDelivered = queuedFinal || (counts.block ?? 0) > 0 || (counts.final ?? 0) > 0;
// Record thread participation only when we actually delivered a reply and
// know the thread ts that was used (set by deliverNormally, streaming start,
// or draft stream). Falls back to statusThreadTs for edge cases.
const participationThreadTs = usedReplyThreadTs ?? statusThreadTs;
if (anyReplyDelivered && participationThreadTs) {
recordSlackThreadParticipation(account.accountId, message.channel, participationThreadTs);
}
if (!anyReplyDelivered) {
await draftStream.clear();
if (prepared.isRoomish) {

View File

@@ -32,6 +32,7 @@ import { resolveThreadSessionKeys } from "../../../routing/session-key.js";
import { resolveSlackReplyToMode, type ResolvedSlackAccount } from "../../accounts.js";
import { reactSlackMessage } from "../../actions.js";
import { sendMessageSlack } from "../../send.js";
import { hasSlackThreadParticipation } from "../../sent-thread-cache.js";
import { resolveSlackThreadContext } from "../../threading.js";
import type { SlackMessageEvent } from "../../types.js";
import { resolveSlackAllowListMatch, resolveSlackUserAllowed } from "../allow-list.js";
@@ -210,7 +211,8 @@ export async function prepareSlackMessage(params: {
!isDirectMessage &&
ctx.botUserId &&
message.thread_ts &&
message.parent_user_id === ctx.botUserId,
(message.parent_user_id === ctx.botUserId ||
hasSlackThreadParticipation(account.accountId, message.channel, message.thread_ts)),
);
const sender = message.user ? await ctx.resolveUserName(message.user) : null;
@@ -259,7 +261,10 @@ export async function prepareSlackMessage(params: {
useAccessGroups: ctx.useAccessGroups,
authorizers: [
{ configured: allowFromLower.length > 0, allowed: ownerAuthorized },
{ configured: channelUsersAllowlistConfigured, allowed: channelCommandAuthorized },
{
configured: channelUsersAllowlistConfigured,
allowed: channelCommandAuthorized,
},
],
allowTextCommands,
hasControlCommand: hasControlCommandInMessage,

View File

@@ -0,0 +1,58 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import {
clearSlackThreadParticipationCache,
hasSlackThreadParticipation,
recordSlackThreadParticipation,
} from "./sent-thread-cache.js";
describe("slack sent-thread-cache", () => {
afterEach(() => {
clearSlackThreadParticipationCache();
vi.restoreAllMocks();
});
it("records and checks thread participation", () => {
recordSlackThreadParticipation("A1", "C123", "1700000000.000001");
expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000001")).toBe(true);
});
it("returns false for unrecorded threads", () => {
expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000001")).toBe(false);
});
it("distinguishes different channels and threads", () => {
recordSlackThreadParticipation("A1", "C123", "1700000000.000001");
expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000002")).toBe(false);
expect(hasSlackThreadParticipation("A1", "C456", "1700000000.000001")).toBe(false);
});
it("scopes participation by accountId", () => {
recordSlackThreadParticipation("A1", "C123", "1700000000.000001");
expect(hasSlackThreadParticipation("A2", "C123", "1700000000.000001")).toBe(false);
expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000001")).toBe(true);
});
it("ignores empty accountId, channelId, or threadTs", () => {
recordSlackThreadParticipation("", "C123", "1700000000.000001");
recordSlackThreadParticipation("A1", "", "1700000000.000001");
recordSlackThreadParticipation("A1", "C123", "");
expect(hasSlackThreadParticipation("", "C123", "1700000000.000001")).toBe(false);
expect(hasSlackThreadParticipation("A1", "", "1700000000.000001")).toBe(false);
expect(hasSlackThreadParticipation("A1", "C123", "")).toBe(false);
});
it("clears all entries", () => {
recordSlackThreadParticipation("A1", "C123", "1700000000.000001");
recordSlackThreadParticipation("A1", "C456", "1700000000.000002");
clearSlackThreadParticipationCache();
expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000001")).toBe(false);
expect(hasSlackThreadParticipation("A1", "C456", "1700000000.000002")).toBe(false);
});
it("expired entries return false and are cleaned up on read", () => {
recordSlackThreadParticipation("A1", "C123", "1700000000.000001");
// Advance time past the 24-hour TTL
vi.spyOn(Date, "now").mockReturnValue(Date.now() + 25 * 60 * 60 * 1000);
expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000001")).toBe(false);
});
});

View File

@@ -0,0 +1,61 @@
/**
* In-memory cache of Slack threads the bot has participated in.
* Used to auto-respond in threads without requiring @mention after the first reply.
* Follows a similar TTL pattern to the MS Teams and Telegram sent-message caches.
*/
const TTL_MS = 24 * 60 * 60 * 1000; // 24 hours
const MAX_ENTRIES = 5000;
const threadParticipation = new Map<string, number>();
function makeKey(accountId: string, channelId: string, threadTs: string): string {
return `${accountId}:${channelId}:${threadTs}`;
}
function evictExpired(): void {
const now = Date.now();
for (const [key, timestamp] of threadParticipation) {
if (now - timestamp > TTL_MS) {
threadParticipation.delete(key);
}
}
}
export function recordSlackThreadParticipation(
accountId: string,
channelId: string,
threadTs: string,
): void {
if (!accountId || !channelId || !threadTs) {
return;
}
if (threadParticipation.size >= MAX_ENTRIES) {
evictExpired();
}
threadParticipation.set(makeKey(accountId, channelId, threadTs), Date.now());
}
export function hasSlackThreadParticipation(
accountId: string,
channelId: string,
threadTs: string,
): boolean {
if (!accountId || !channelId || !threadTs) {
return false;
}
const key = makeKey(accountId, channelId, threadTs);
const timestamp = threadParticipation.get(key);
if (timestamp == null) {
return false;
}
if (Date.now() - timestamp > TTL_MS) {
threadParticipation.delete(key);
return false;
}
return true;
}
export function clearSlackThreadParticipationCache(): void {
threadParticipation.clear();
}