427 lines
14 KiB
TypeScript
427 lines
14 KiB
TypeScript
import { ChannelType } from "@buape/carbon";
|
||
import { resolveAckReaction, resolveHumanDelayConfig } from "../../agents/identity.js";
|
||
import {
|
||
removeAckReactionAfterReply,
|
||
shouldAckReaction as shouldAckReactionGate,
|
||
} from "../../channels/ack-reactions.js";
|
||
import { logTypingFailure, logAckFailure } from "../../channels/logging.js";
|
||
import { createReplyPrefixContext } from "../../channels/reply-prefix.js";
|
||
import { createTypingCallbacks } from "../../channels/typing.js";
|
||
import {
|
||
formatInboundEnvelope,
|
||
formatThreadStarterEnvelope,
|
||
resolveEnvelopeFormatOptions,
|
||
} from "../../auto-reply/envelope.js";
|
||
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
|
||
import {
|
||
buildPendingHistoryContextFromMap,
|
||
clearHistoryEntriesIfEnabled,
|
||
} from "../../auto-reply/reply/history.js";
|
||
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
|
||
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
|
||
import type { ReplyPayload } from "../../auto-reply/types.js";
|
||
import { recordInboundSession } from "../../channels/session.js";
|
||
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
|
||
import { resolveMarkdownTableMode } from "../../config/markdown-tables.js";
|
||
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
|
||
import { buildAgentSessionKey } from "../../routing/resolve-route.js";
|
||
import { resolveThreadSessionKeys } from "../../routing/session-key.js";
|
||
import { truncateUtf16Safe } from "../../utils.js";
|
||
import { reactMessageDiscord, removeReactionDiscord } from "../send.js";
|
||
import { normalizeDiscordSlug } from "./allow-list.js";
|
||
import { formatDiscordUserTag, resolveTimestampMs } from "./format.js";
|
||
import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js";
|
||
import {
|
||
buildDiscordMediaPayload,
|
||
resolveDiscordMessageText,
|
||
resolveMediaList,
|
||
} from "./message-utils.js";
|
||
import { buildDirectLabel, buildGuildLabel, resolveReplyContext } from "./reply-context.js";
|
||
import { deliverDiscordReply } from "./reply-delivery.js";
|
||
import { resolveDiscordAutoThreadReplyPlan, resolveDiscordThreadStarter } from "./threading.js";
|
||
import { sendTyping } from "./typing.js";
|
||
|
||
export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) {
|
||
const {
|
||
cfg,
|
||
discordConfig,
|
||
accountId,
|
||
token,
|
||
runtime,
|
||
guildHistories,
|
||
historyLimit,
|
||
mediaMaxBytes,
|
||
textLimit,
|
||
replyToMode,
|
||
ackReactionScope,
|
||
message,
|
||
author,
|
||
data,
|
||
client,
|
||
channelInfo,
|
||
channelName,
|
||
isGuildMessage,
|
||
isDirectMessage,
|
||
isGroupDm,
|
||
baseText,
|
||
messageText,
|
||
shouldRequireMention,
|
||
canDetectMention,
|
||
effectiveWasMentioned,
|
||
shouldBypassMention,
|
||
threadChannel,
|
||
threadParentId,
|
||
threadParentName,
|
||
threadParentType,
|
||
threadName,
|
||
displayChannelSlug,
|
||
guildInfo,
|
||
guildSlug,
|
||
channelConfig,
|
||
baseSessionKey,
|
||
route,
|
||
commandAuthorized,
|
||
} = ctx;
|
||
|
||
const mediaList = await resolveMediaList(message, mediaMaxBytes);
|
||
const text = messageText;
|
||
if (!text) {
|
||
logVerbose(`discord: drop message ${message.id} (empty content)`);
|
||
return;
|
||
}
|
||
const ackReaction = resolveAckReaction(cfg, route.agentId);
|
||
const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false;
|
||
const shouldAckReaction = () =>
|
||
Boolean(
|
||
ackReaction &&
|
||
shouldAckReactionGate({
|
||
scope: ackReactionScope,
|
||
isDirect: isDirectMessage,
|
||
isGroup: isGuildMessage || isGroupDm,
|
||
isMentionableGroup: isGuildMessage,
|
||
requireMention: Boolean(shouldRequireMention),
|
||
canDetectMention,
|
||
effectiveWasMentioned,
|
||
shouldBypassMention,
|
||
}),
|
||
);
|
||
const ackReactionPromise = shouldAckReaction()
|
||
? reactMessageDiscord(message.channelId, message.id, ackReaction, {
|
||
rest: client.rest,
|
||
}).then(
|
||
() => true,
|
||
(err) => {
|
||
logVerbose(`discord react failed for channel ${message.channelId}: ${String(err)}`);
|
||
return false;
|
||
},
|
||
)
|
||
: null;
|
||
|
||
const fromLabel = isDirectMessage
|
||
? buildDirectLabel(author)
|
||
: buildGuildLabel({
|
||
guild: data.guild ?? undefined,
|
||
channelName: channelName ?? message.channelId,
|
||
channelId: message.channelId,
|
||
});
|
||
const senderTag = formatDiscordUserTag(author);
|
||
const senderDisplay = data.member?.nickname ?? author.globalName ?? author.username;
|
||
const senderLabel =
|
||
senderDisplay && senderTag && senderDisplay !== senderTag
|
||
? `${senderDisplay} (${senderTag})`
|
||
: (senderDisplay ?? senderTag ?? author.id);
|
||
const isForumParent =
|
||
threadParentType === ChannelType.GuildForum || threadParentType === ChannelType.GuildMedia;
|
||
const forumParentSlug =
|
||
isForumParent && threadParentName ? normalizeDiscordSlug(threadParentName) : "";
|
||
const isForumStarter =
|
||
threadChannel && isForumParent && forumParentSlug ? message.id === threadChannel.id : false;
|
||
const forumContextLine = isForumStarter ? `[Forum parent: #${forumParentSlug}]` : null;
|
||
const groupChannel = isGuildMessage && displayChannelSlug ? `#${displayChannelSlug}` : undefined;
|
||
const groupSubject = isDirectMessage ? undefined : groupChannel;
|
||
const channelDescription = channelInfo?.topic?.trim();
|
||
const systemPromptParts = [
|
||
channelDescription ? `Channel topic: ${channelDescription}` : null,
|
||
channelConfig?.systemPrompt?.trim() || null,
|
||
].filter((entry): entry is string => Boolean(entry));
|
||
const groupSystemPrompt =
|
||
systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined;
|
||
const storePath = resolveStorePath(cfg.session?.store, {
|
||
agentId: route.agentId,
|
||
});
|
||
const envelopeOptions = resolveEnvelopeFormatOptions(cfg);
|
||
const previousTimestamp = readSessionUpdatedAt({
|
||
storePath,
|
||
sessionKey: route.sessionKey,
|
||
});
|
||
let combinedBody = formatInboundEnvelope({
|
||
channel: "Discord",
|
||
from: fromLabel,
|
||
timestamp: resolveTimestampMs(message.timestamp),
|
||
body: text,
|
||
chatType: isDirectMessage ? "direct" : "channel",
|
||
senderLabel,
|
||
previousTimestamp,
|
||
envelope: envelopeOptions,
|
||
});
|
||
const shouldIncludeChannelHistory =
|
||
!isDirectMessage && !(isGuildMessage && channelConfig?.autoThread && !threadChannel);
|
||
if (shouldIncludeChannelHistory) {
|
||
combinedBody = buildPendingHistoryContextFromMap({
|
||
historyMap: guildHistories,
|
||
historyKey: message.channelId,
|
||
limit: historyLimit,
|
||
currentMessage: combinedBody,
|
||
formatEntry: (entry) =>
|
||
formatInboundEnvelope({
|
||
channel: "Discord",
|
||
from: fromLabel,
|
||
timestamp: entry.timestamp,
|
||
body: `${entry.body} [id:${entry.messageId ?? "unknown"} channel:${message.channelId}]`,
|
||
chatType: "channel",
|
||
senderLabel: entry.sender,
|
||
envelope: envelopeOptions,
|
||
}),
|
||
});
|
||
}
|
||
const replyContext = resolveReplyContext(message, resolveDiscordMessageText, {
|
||
envelope: envelopeOptions,
|
||
});
|
||
if (replyContext) {
|
||
combinedBody = `[Replied message - for context]\n${replyContext}\n\n${combinedBody}`;
|
||
}
|
||
if (forumContextLine) {
|
||
combinedBody = `${combinedBody}\n${forumContextLine}`;
|
||
}
|
||
|
||
let threadStarterBody: string | undefined;
|
||
let threadLabel: string | undefined;
|
||
let parentSessionKey: string | undefined;
|
||
if (threadChannel) {
|
||
const starter = await resolveDiscordThreadStarter({
|
||
channel: threadChannel,
|
||
client,
|
||
parentId: threadParentId,
|
||
parentType: threadParentType,
|
||
resolveTimestampMs,
|
||
});
|
||
if (starter?.text) {
|
||
const starterEnvelope = formatThreadStarterEnvelope({
|
||
channel: "Discord",
|
||
author: starter.author,
|
||
timestamp: starter.timestamp,
|
||
body: starter.text,
|
||
envelope: envelopeOptions,
|
||
});
|
||
threadStarterBody = starterEnvelope;
|
||
}
|
||
const parentName = threadParentName ?? "parent";
|
||
threadLabel = threadName
|
||
? `Discord thread #${normalizeDiscordSlug(parentName)} › ${threadName}`
|
||
: `Discord thread #${normalizeDiscordSlug(parentName)}`;
|
||
if (threadParentId) {
|
||
parentSessionKey = buildAgentSessionKey({
|
||
agentId: route.agentId,
|
||
channel: route.channel,
|
||
peer: { kind: "channel", id: threadParentId },
|
||
});
|
||
}
|
||
}
|
||
const mediaPayload = buildDiscordMediaPayload(mediaList);
|
||
const threadKeys = resolveThreadSessionKeys({
|
||
baseSessionKey,
|
||
threadId: threadChannel ? message.channelId : undefined,
|
||
parentSessionKey,
|
||
useSuffix: false,
|
||
});
|
||
const replyPlan = await resolveDiscordAutoThreadReplyPlan({
|
||
client,
|
||
message,
|
||
isGuildMessage,
|
||
channelConfig,
|
||
threadChannel,
|
||
baseText: baseText ?? "",
|
||
combinedBody,
|
||
replyToMode,
|
||
agentId: route.agentId,
|
||
channel: route.channel,
|
||
});
|
||
const deliverTarget = replyPlan.deliverTarget;
|
||
const replyTarget = replyPlan.replyTarget;
|
||
const replyReference = replyPlan.replyReference;
|
||
const autoThreadContext = replyPlan.autoThreadContext;
|
||
|
||
const effectiveFrom = isDirectMessage
|
||
? `discord:${author.id}`
|
||
: (autoThreadContext?.From ?? `discord:channel:${message.channelId}`);
|
||
const effectiveTo = autoThreadContext?.To ?? replyTarget;
|
||
if (!effectiveTo) {
|
||
runtime.error?.(danger("discord: missing reply target"));
|
||
return;
|
||
}
|
||
|
||
const ctxPayload = finalizeInboundContext({
|
||
Body: combinedBody,
|
||
RawBody: baseText,
|
||
CommandBody: baseText,
|
||
From: effectiveFrom,
|
||
To: effectiveTo,
|
||
SessionKey: autoThreadContext?.SessionKey ?? threadKeys.sessionKey,
|
||
AccountId: route.accountId,
|
||
ChatType: isDirectMessage ? "direct" : "channel",
|
||
ConversationLabel: fromLabel,
|
||
SenderName: data.member?.nickname ?? author.globalName ?? author.username,
|
||
SenderId: author.id,
|
||
SenderUsername: author.username,
|
||
SenderTag: formatDiscordUserTag(author),
|
||
GroupSubject: groupSubject,
|
||
GroupChannel: groupChannel,
|
||
GroupSystemPrompt: isGuildMessage ? groupSystemPrompt : undefined,
|
||
GroupSpace: isGuildMessage ? (guildInfo?.id ?? guildSlug) || undefined : undefined,
|
||
Provider: "discord" as const,
|
||
Surface: "discord" as const,
|
||
WasMentioned: effectiveWasMentioned,
|
||
MessageSid: message.id,
|
||
ParentSessionKey: autoThreadContext?.ParentSessionKey ?? threadKeys.parentSessionKey,
|
||
ThreadStarterBody: threadStarterBody,
|
||
ThreadLabel: threadLabel,
|
||
Timestamp: resolveTimestampMs(message.timestamp),
|
||
...mediaPayload,
|
||
CommandAuthorized: commandAuthorized,
|
||
CommandSource: "text" as const,
|
||
// Originating channel for reply routing.
|
||
OriginatingChannel: "discord" as const,
|
||
OriginatingTo: autoThreadContext?.OriginatingTo ?? replyTarget,
|
||
});
|
||
|
||
await recordInboundSession({
|
||
storePath,
|
||
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
|
||
ctx: ctxPayload,
|
||
updateLastRoute: isDirectMessage
|
||
? {
|
||
sessionKey: route.mainSessionKey,
|
||
channel: "discord",
|
||
to: `user:${author.id}`,
|
||
accountId: route.accountId,
|
||
}
|
||
: undefined,
|
||
onRecordError: (err) => {
|
||
logVerbose(`discord: failed updating session meta: ${String(err)}`);
|
||
},
|
||
});
|
||
|
||
if (shouldLogVerbose()) {
|
||
const preview = truncateUtf16Safe(combinedBody, 200).replace(/\n/g, "\\n");
|
||
logVerbose(
|
||
`discord inbound: channel=${message.channelId} deliver=${deliverTarget} from=${ctxPayload.From} preview="${preview}"`,
|
||
);
|
||
}
|
||
|
||
const typingChannelId = deliverTarget.startsWith("channel:")
|
||
? deliverTarget.slice("channel:".length)
|
||
: message.channelId;
|
||
|
||
const prefixContext = createReplyPrefixContext({ cfg, agentId: route.agentId });
|
||
const tableMode = resolveMarkdownTableMode({
|
||
cfg,
|
||
channel: "discord",
|
||
accountId,
|
||
});
|
||
|
||
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
|
||
responsePrefix: prefixContext.responsePrefix,
|
||
responsePrefixContextProvider: prefixContext.responsePrefixContextProvider,
|
||
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
|
||
deliver: async (payload: ReplyPayload) => {
|
||
const replyToId = replyReference.use();
|
||
await deliverDiscordReply({
|
||
replies: [payload],
|
||
target: deliverTarget,
|
||
token,
|
||
accountId,
|
||
rest: client.rest,
|
||
runtime,
|
||
replyToId,
|
||
textLimit,
|
||
maxLinesPerMessage: discordConfig?.maxLinesPerMessage,
|
||
tableMode,
|
||
});
|
||
replyReference.markSent();
|
||
},
|
||
onError: (err, info) => {
|
||
runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`));
|
||
},
|
||
onReplyStart: createTypingCallbacks({
|
||
start: () => sendTyping({ client, channelId: typingChannelId }),
|
||
onStartError: (err) => {
|
||
logTypingFailure({
|
||
log: logVerbose,
|
||
channel: "discord",
|
||
target: typingChannelId,
|
||
error: err,
|
||
});
|
||
},
|
||
}).onReplyStart,
|
||
});
|
||
|
||
const { queuedFinal, counts } = await dispatchInboundMessage({
|
||
ctx: ctxPayload,
|
||
cfg,
|
||
dispatcher,
|
||
replyOptions: {
|
||
...replyOptions,
|
||
skillFilter: channelConfig?.skills,
|
||
disableBlockStreaming:
|
||
typeof discordConfig?.blockStreaming === "boolean"
|
||
? !discordConfig.blockStreaming
|
||
: undefined,
|
||
onModelSelected: (ctx) => {
|
||
prefixContext.onModelSelected(ctx);
|
||
},
|
||
},
|
||
});
|
||
markDispatchIdle();
|
||
if (!queuedFinal) {
|
||
if (isGuildMessage) {
|
||
clearHistoryEntriesIfEnabled({
|
||
historyMap: guildHistories,
|
||
historyKey: message.channelId,
|
||
limit: historyLimit,
|
||
});
|
||
}
|
||
return;
|
||
}
|
||
if (shouldLogVerbose()) {
|
||
const finalCount = counts.final;
|
||
logVerbose(
|
||
`discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`,
|
||
);
|
||
}
|
||
removeAckReactionAfterReply({
|
||
removeAfterReply: removeAckAfterReply,
|
||
ackReactionPromise,
|
||
ackReactionValue: ackReaction,
|
||
remove: async () => {
|
||
await removeReactionDiscord(message.channelId, message.id, ackReaction, {
|
||
rest: client.rest,
|
||
});
|
||
},
|
||
onError: (err) => {
|
||
logAckFailure({
|
||
log: logVerbose,
|
||
channel: "discord",
|
||
target: `${message.channelId}/${message.id}`,
|
||
error: err,
|
||
});
|
||
},
|
||
});
|
||
if (isGuildMessage) {
|
||
clearHistoryEntriesIfEnabled({
|
||
historyMap: guildHistories,
|
||
historyKey: message.channelId,
|
||
limit: historyLimit,
|
||
});
|
||
}
|
||
}
|