import { timingSafeEqual } from "node:crypto"; import type { IncomingMessage, ServerResponse } from "node:http"; import type { OpenClawConfig } from "openclaw/plugin-sdk"; import { registerWebhookTarget, rejectNonPostWebhookRequest, resolveWebhookTargets, } from "openclaw/plugin-sdk"; import { normalizeWebhookMessage, normalizeWebhookReaction, type NormalizedWebhookMessage, } from "./monitor-normalize.js"; import { logVerbose, processMessage, processReaction } from "./monitor-processing.js"; import { _resetBlueBubblesShortIdState, resolveBlueBubblesMessageId, } from "./monitor-reply-cache.js"; import { DEFAULT_WEBHOOK_PATH, normalizeWebhookPath, resolveWebhookPathFromConfig, type BlueBubblesCoreRuntime, type BlueBubblesMonitorOptions, type WebhookTarget, } from "./monitor-shared.js"; import { fetchBlueBubblesServerInfo } from "./probe.js"; import { getBlueBubblesRuntime } from "./runtime.js"; /** * Entry type for debouncing inbound messages. * Captures the normalized message and its target for later combined processing. */ type BlueBubblesDebounceEntry = { message: NormalizedWebhookMessage; target: WebhookTarget; }; /** * Default debounce window for inbound message coalescing (ms). * This helps combine URL text + link preview balloon messages that BlueBubbles * sends as separate webhook events when no explicit inbound debounce config exists. */ const DEFAULT_INBOUND_DEBOUNCE_MS = 500; /** * Combines multiple debounced messages into a single message for processing. * Used when multiple webhook events arrive within the debounce window. */ function combineDebounceEntries(entries: BlueBubblesDebounceEntry[]): NormalizedWebhookMessage { if (entries.length === 0) { throw new Error("Cannot combine empty entries"); } if (entries.length === 1) { return entries[0].message; } // Use the first message as the base (typically the text message) const first = entries[0].message; // Combine text from all entries, filtering out duplicates and empty strings const seenTexts = new Set(); const textParts: string[] = []; for (const entry of entries) { const text = entry.message.text.trim(); if (!text) { continue; } // Skip duplicate text (URL might be in both text message and balloon) const normalizedText = text.toLowerCase(); if (seenTexts.has(normalizedText)) { continue; } seenTexts.add(normalizedText); textParts.push(text); } // Merge attachments from all entries const allAttachments = entries.flatMap((e) => e.message.attachments ?? []); // Use the latest timestamp const timestamps = entries .map((e) => e.message.timestamp) .filter((t): t is number => typeof t === "number"); const latestTimestamp = timestamps.length > 0 ? Math.max(...timestamps) : first.timestamp; // Collect all message IDs for reference const messageIds = entries .map((e) => e.message.messageId) .filter((id): id is string => Boolean(id)); // Prefer reply context from any entry that has it const entryWithReply = entries.find((e) => e.message.replyToId); return { ...first, text: textParts.join(" "), attachments: allAttachments.length > 0 ? allAttachments : first.attachments, timestamp: latestTimestamp, // Use first message's ID as primary (for reply reference), but we've coalesced others messageId: messageIds[0] ?? first.messageId, // Preserve reply context if present replyToId: entryWithReply?.message.replyToId ?? first.replyToId, replyToBody: entryWithReply?.message.replyToBody ?? first.replyToBody, replyToSender: entryWithReply?.message.replyToSender ?? first.replyToSender, // Clear balloonBundleId since we've combined (the combined message is no longer just a balloon) balloonBundleId: undefined, }; } const webhookTargets = new Map(); type BlueBubblesDebouncer = { enqueue: (item: BlueBubblesDebounceEntry) => Promise; flushKey: (key: string) => Promise; }; /** * Maps webhook targets to their inbound debouncers. * Each target gets its own debouncer keyed by a unique identifier. */ const targetDebouncers = new Map(); function resolveBlueBubblesDebounceMs( config: OpenClawConfig, core: BlueBubblesCoreRuntime, ): number { const inbound = config.messages?.inbound; const hasExplicitDebounce = typeof inbound?.debounceMs === "number" || typeof inbound?.byChannel?.bluebubbles === "number"; if (!hasExplicitDebounce) { return DEFAULT_INBOUND_DEBOUNCE_MS; } return core.channel.debounce.resolveInboundDebounceMs({ cfg: config, channel: "bluebubbles" }); } /** * Creates or retrieves a debouncer for a webhook target. */ function getOrCreateDebouncer(target: WebhookTarget) { const existing = targetDebouncers.get(target); if (existing) { return existing; } const { account, config, runtime, core } = target; const debouncer = core.channel.debounce.createInboundDebouncer({ debounceMs: resolveBlueBubblesDebounceMs(config, core), buildKey: (entry) => { const msg = entry.message; // Prefer stable, shared identifiers to coalesce rapid-fire webhook events for the // same message (e.g., text-only then text+attachment). // // For balloons (URL previews, stickers, etc), BlueBubbles often uses a different // messageId than the originating text. When present, key by associatedMessageGuid // to keep text + balloon coalescing working. const balloonBundleId = msg.balloonBundleId?.trim(); const associatedMessageGuid = msg.associatedMessageGuid?.trim(); if (balloonBundleId && associatedMessageGuid) { return `bluebubbles:${account.accountId}:balloon:${associatedMessageGuid}`; } const messageId = msg.messageId?.trim(); if (messageId) { return `bluebubbles:${account.accountId}:msg:${messageId}`; } const chatKey = msg.chatGuid?.trim() ?? msg.chatIdentifier?.trim() ?? (msg.chatId ? String(msg.chatId) : "dm"); return `bluebubbles:${account.accountId}:${chatKey}:${msg.senderId}`; }, shouldDebounce: (entry) => { const msg = entry.message; // Skip debouncing for from-me messages (they're just cached, not processed) if (msg.fromMe) { return false; } // Skip debouncing for control commands - process immediately if (core.channel.text.hasControlCommand(msg.text, config)) { return false; } // Debounce all other messages to coalesce rapid-fire webhook events // (e.g., text+image arriving as separate webhooks for the same messageId) return true; }, onFlush: async (entries) => { if (entries.length === 0) { return; } // Use target from first entry (all entries have same target due to key structure) const flushTarget = entries[0].target; if (entries.length === 1) { // Single message - process normally await processMessage(entries[0].message, flushTarget); return; } // Multiple messages - combine and process const combined = combineDebounceEntries(entries); if (core.logging.shouldLogVerbose()) { const count = entries.length; const preview = combined.text.slice(0, 50); runtime.log?.( `[bluebubbles] coalesced ${count} messages: "${preview}${combined.text.length > 50 ? "..." : ""}"`, ); } await processMessage(combined, flushTarget); }, onError: (err) => { runtime.error?.(`[${account.accountId}] [bluebubbles] debounce flush failed: ${String(err)}`); }, }); targetDebouncers.set(target, debouncer); return debouncer; } /** * Removes a debouncer for a target (called during unregistration). */ function removeDebouncer(target: WebhookTarget): void { targetDebouncers.delete(target); } export function registerBlueBubblesWebhookTarget(target: WebhookTarget): () => void { const registered = registerWebhookTarget(webhookTargets, target); return () => { registered.unregister(); // Clean up debouncer when target is unregistered removeDebouncer(registered.target); }; } async function readJsonBody(req: IncomingMessage, maxBytes: number, timeoutMs = 30_000) { const chunks: Buffer[] = []; let total = 0; return await new Promise<{ ok: boolean; value?: unknown; error?: string }>((resolve) => { let done = false; const finish = (result: { ok: boolean; value?: unknown; error?: string }) => { if (done) { return; } done = true; clearTimeout(timer); resolve(result); }; const timer = setTimeout(() => { finish({ ok: false, error: "request body timeout" }); req.destroy(); }, timeoutMs); req.on("data", (chunk: Buffer) => { total += chunk.length; if (total > maxBytes) { finish({ ok: false, error: "payload too large" }); req.destroy(); return; } chunks.push(chunk); }); req.on("end", () => { try { const raw = Buffer.concat(chunks).toString("utf8"); if (!raw.trim()) { finish({ ok: false, error: "empty payload" }); return; } try { finish({ ok: true, value: JSON.parse(raw) as unknown }); return; } catch { const params = new URLSearchParams(raw); const payload = params.get("payload") ?? params.get("data") ?? params.get("message"); if (payload) { finish({ ok: true, value: JSON.parse(payload) as unknown }); return; } throw new Error("invalid json"); } } catch (err) { finish({ ok: false, error: err instanceof Error ? err.message : String(err) }); } }); req.on("error", (err) => { finish({ ok: false, error: err instanceof Error ? err.message : String(err) }); }); req.on("close", () => { finish({ ok: false, error: "connection closed" }); }); }); } function asRecord(value: unknown): Record | null { return value && typeof value === "object" && !Array.isArray(value) ? (value as Record) : null; } function maskSecret(value: string): string { if (value.length <= 6) { return "***"; } return `${value.slice(0, 2)}***${value.slice(-2)}`; } function normalizeAuthToken(raw: string): string { const value = raw.trim(); if (!value) { return ""; } if (value.toLowerCase().startsWith("bearer ")) { return value.slice("bearer ".length).trim(); } return value; } function safeEqualSecret(aRaw: string, bRaw: string): boolean { const a = normalizeAuthToken(aRaw); const b = normalizeAuthToken(bRaw); if (!a || !b) { return false; } const bufA = Buffer.from(a, "utf8"); const bufB = Buffer.from(b, "utf8"); if (bufA.length !== bufB.length) { return false; } return timingSafeEqual(bufA, bufB); } function getHostName(hostHeader?: string | string[]): string { const host = (Array.isArray(hostHeader) ? hostHeader[0] : (hostHeader ?? "")) .trim() .toLowerCase(); if (!host) { return ""; } // Bracketed IPv6: [::1]:18789 if (host.startsWith("[")) { const end = host.indexOf("]"); if (end !== -1) { return host.slice(1, end); } } const [name] = host.split(":"); return name ?? ""; } function isDirectLocalLoopbackRequest(req: IncomingMessage): boolean { const remote = (req.socket?.remoteAddress ?? "").trim().toLowerCase(); const remoteIsLoopback = remote === "127.0.0.1" || remote === "::1" || remote === "::ffff:127.0.0.1"; if (!remoteIsLoopback) { return false; } const host = getHostName(req.headers?.host); const hostIsLocal = host === "localhost" || host === "127.0.0.1" || host === "::1"; if (!hostIsLocal) { return false; } // If a reverse proxy is in front, it will usually inject forwarding headers. // Passwordless webhooks must never be accepted through a proxy. const hasForwarded = Boolean( req.headers?.["x-forwarded-for"] || req.headers?.["x-real-ip"] || req.headers?.["x-forwarded-host"], ); return !hasForwarded; } export async function handleBlueBubblesWebhookRequest( req: IncomingMessage, res: ServerResponse, ): Promise { const resolved = resolveWebhookTargets(req, webhookTargets); if (!resolved) { return false; } const { path, targets } = resolved; const url = new URL(req.url ?? "/", "http://localhost"); if (rejectNonPostWebhookRequest(req, res)) { return true; } const body = await readJsonBody(req, 1024 * 1024); if (!body.ok) { if (body.error === "payload too large") { res.statusCode = 413; } else if (body.error === "request body timeout") { res.statusCode = 408; } else { res.statusCode = 400; } res.end(body.error ?? "invalid payload"); console.warn(`[bluebubbles] webhook rejected: ${body.error ?? "invalid payload"}`); return true; } const payload = asRecord(body.value) ?? {}; const firstTarget = targets[0]; if (firstTarget) { logVerbose( firstTarget.core, firstTarget.runtime, `webhook received path=${path} keys=${Object.keys(payload).join(",") || "none"}`, ); } const eventTypeRaw = payload.type; const eventType = typeof eventTypeRaw === "string" ? eventTypeRaw.trim() : ""; const allowedEventTypes = new Set([ "new-message", "updated-message", "message-reaction", "reaction", ]); if (eventType && !allowedEventTypes.has(eventType)) { res.statusCode = 200; res.end("ok"); if (firstTarget) { logVerbose(firstTarget.core, firstTarget.runtime, `webhook ignored type=${eventType}`); } return true; } const reaction = normalizeWebhookReaction(payload); if ( (eventType === "updated-message" || eventType === "message-reaction" || eventType === "reaction") && !reaction ) { res.statusCode = 200; res.end("ok"); if (firstTarget) { logVerbose( firstTarget.core, firstTarget.runtime, `webhook ignored ${eventType || "event"} without reaction`, ); } return true; } const message = reaction ? null : normalizeWebhookMessage(payload); if (!message && !reaction) { res.statusCode = 400; res.end("invalid payload"); console.warn("[bluebubbles] webhook rejected: unable to parse message payload"); return true; } const guidParam = url.searchParams.get("guid") ?? url.searchParams.get("password"); const headerToken = req.headers["x-guid"] ?? req.headers["x-password"] ?? req.headers["x-bluebubbles-guid"] ?? req.headers["authorization"]; const guid = (Array.isArray(headerToken) ? headerToken[0] : headerToken) ?? guidParam ?? ""; const strictMatches: WebhookTarget[] = []; const passwordlessTargets: WebhookTarget[] = []; for (const target of targets) { const token = target.account.config.password?.trim() ?? ""; if (!token) { passwordlessTargets.push(target); continue; } if (safeEqualSecret(guid, token)) { strictMatches.push(target); if (strictMatches.length > 1) { break; } } } const matching = strictMatches.length > 0 ? strictMatches : isDirectLocalLoopbackRequest(req) ? passwordlessTargets : []; if (matching.length === 0) { res.statusCode = 401; res.end("unauthorized"); console.warn( `[bluebubbles] webhook rejected: unauthorized guid=${maskSecret(url.searchParams.get("guid") ?? url.searchParams.get("password") ?? "")}`, ); return true; } if (matching.length > 1) { res.statusCode = 401; res.end("ambiguous webhook target"); console.warn(`[bluebubbles] webhook rejected: ambiguous target match path=${path}`); return true; } const target = matching[0]; target.statusSink?.({ lastInboundAt: Date.now() }); if (reaction) { processReaction(reaction, target).catch((err) => { target.runtime.error?.( `[${target.account.accountId}] BlueBubbles reaction failed: ${String(err)}`, ); }); } else if (message) { // Route messages through debouncer to coalesce rapid-fire events // (e.g., text message + URL balloon arriving as separate webhooks) const debouncer = getOrCreateDebouncer(target); debouncer.enqueue({ message, target }).catch((err) => { target.runtime.error?.( `[${target.account.accountId}] BlueBubbles webhook failed: ${String(err)}`, ); }); } res.statusCode = 200; res.end("ok"); if (reaction) { if (firstTarget) { logVerbose( firstTarget.core, firstTarget.runtime, `webhook accepted reaction sender=${reaction.senderId} msg=${reaction.messageId} action=${reaction.action}`, ); } } else if (message) { if (firstTarget) { logVerbose( firstTarget.core, firstTarget.runtime, `webhook accepted sender=${message.senderId} group=${message.isGroup} chatGuid=${message.chatGuid ?? ""} chatId=${message.chatId ?? ""}`, ); } } return true; } export async function monitorBlueBubblesProvider( options: BlueBubblesMonitorOptions, ): Promise { const { account, config, runtime, abortSignal, statusSink } = options; const core = getBlueBubblesRuntime(); const path = options.webhookPath?.trim() || DEFAULT_WEBHOOK_PATH; // Fetch and cache server info (for macOS version detection in action gating) const serverInfo = await fetchBlueBubblesServerInfo({ baseUrl: account.baseUrl, password: account.config.password, accountId: account.accountId, timeoutMs: 5000, }).catch(() => null); if (serverInfo?.os_version) { runtime.log?.(`[${account.accountId}] BlueBubbles server macOS ${serverInfo.os_version}`); } if (typeof serverInfo?.private_api === "boolean") { runtime.log?.( `[${account.accountId}] BlueBubbles Private API ${serverInfo.private_api ? "enabled" : "disabled"}`, ); } const unregister = registerBlueBubblesWebhookTarget({ account, config, runtime, core, path, statusSink, }); return await new Promise((resolve) => { const stop = () => { unregister(); resolve(); }; if (abortSignal?.aborted) { stop(); return; } abortSignal?.addEventListener("abort", stop, { once: true }); runtime.log?.( `[${account.accountId}] BlueBubbles webhook listening on ${normalizeWebhookPath(path)}`, ); }); } export { _resetBlueBubblesShortIdState, resolveBlueBubblesMessageId, resolveWebhookPathFromConfig };