import type { IncomingMessage, ServerResponse } from "node:http"; import type { OpenClawConfig } from "openclaw/plugin-sdk"; import { createWebhookInFlightLimiter, createReplyPrefixOptions, registerWebhookTargetWithPluginRoute, resolveInboundRouteEnvelopeBuilderWithRuntime, resolveWebhookPath, } from "openclaw/plugin-sdk"; import { type ResolvedGoogleChatAccount } from "./accounts.js"; import { downloadGoogleChatMedia, deleteGoogleChatMessage, sendGoogleChatMessage, updateGoogleChatMessage, } from "./api.js"; import { type GoogleChatAudienceType } from "./auth.js"; import { applyGoogleChatInboundAccessPolicy, isSenderAllowed } from "./monitor-access.js"; import type { GoogleChatCoreRuntime, GoogleChatMonitorOptions, GoogleChatRuntimeEnv, WebhookTarget, } from "./monitor-types.js"; import { createGoogleChatWebhookRequestHandler } from "./monitor-webhook.js"; import { getGoogleChatRuntime } from "./runtime.js"; import type { GoogleChatAttachment, GoogleChatEvent } from "./types.js"; export type { GoogleChatMonitorOptions, GoogleChatRuntimeEnv } from "./monitor-types.js"; export { isSenderAllowed }; const webhookTargets = new Map(); const webhookInFlightLimiter = createWebhookInFlightLimiter(); const googleChatWebhookRequestHandler = createGoogleChatWebhookRequestHandler({ webhookTargets, webhookInFlightLimiter, processEvent: async (event, target) => { await processGoogleChatEvent(event, target); }, }); function logVerbose(core: GoogleChatCoreRuntime, runtime: GoogleChatRuntimeEnv, message: string) { if (core.logging.shouldLogVerbose()) { runtime.log?.(`[googlechat] ${message}`); } } export function registerGoogleChatWebhookTarget(target: WebhookTarget): () => void { return registerWebhookTargetWithPluginRoute({ targetsByPath: webhookTargets, target, route: { auth: "plugin", match: "exact", pluginId: "googlechat", source: "googlechat-webhook", accountId: target.account.accountId, log: target.runtime.log, handler: async (req, res) => { const handled = await handleGoogleChatWebhookRequest(req, res); if (!handled && !res.headersSent) { res.statusCode = 404; res.setHeader("Content-Type", "text/plain; charset=utf-8"); res.end("Not Found"); } }, }, }).unregister; } function normalizeAudienceType(value?: string | null): GoogleChatAudienceType | undefined { const normalized = value?.trim().toLowerCase(); if (normalized === "app-url" || normalized === "app_url" || normalized === "app") { return "app-url"; } if ( normalized === "project-number" || normalized === "project_number" || normalized === "project" ) { return "project-number"; } return undefined; } export async function handleGoogleChatWebhookRequest( req: IncomingMessage, res: ServerResponse, ): Promise { return await googleChatWebhookRequestHandler(req, res); } async function processGoogleChatEvent(event: GoogleChatEvent, target: WebhookTarget) { const eventType = event.type ?? (event as { eventType?: string }).eventType; if (eventType !== "MESSAGE") { return; } if (!event.message || !event.space) { return; } await processMessageWithPipeline({ event, account: target.account, config: target.config, runtime: target.runtime, core: target.core, statusSink: target.statusSink, mediaMaxMb: target.mediaMaxMb, }); } /** * Resolve bot display name with fallback chain: * 1. Account config name * 2. Agent name from config * 3. "OpenClaw" as generic fallback */ function resolveBotDisplayName(params: { accountName?: string; agentId: string; config: OpenClawConfig; }): string { const { accountName, agentId, config } = params; if (accountName?.trim()) { return accountName.trim(); } const agent = config.agents?.list?.find((a) => a.id === agentId); if (agent?.name?.trim()) { return agent.name.trim(); } return "OpenClaw"; } async function processMessageWithPipeline(params: { event: GoogleChatEvent; account: ResolvedGoogleChatAccount; config: OpenClawConfig; runtime: GoogleChatRuntimeEnv; core: GoogleChatCoreRuntime; statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; mediaMaxMb: number; }): Promise { const { event, account, config, runtime, core, statusSink, mediaMaxMb } = params; const space = event.space; const message = event.message; if (!space || !message) { return; } const spaceId = space.name ?? ""; if (!spaceId) { return; } const spaceType = (space.type ?? "").toUpperCase(); const isGroup = spaceType !== "DM"; const sender = message.sender ?? event.user; const senderId = sender?.name ?? ""; const senderName = sender?.displayName ?? ""; const senderEmail = sender?.email ?? undefined; const allowBots = account.config.allowBots === true; if (!allowBots) { if (sender?.type?.toUpperCase() === "BOT") { logVerbose(core, runtime, `skip bot-authored message (${senderId || "unknown"})`); return; } if (senderId === "users/app") { logVerbose(core, runtime, "skip app-authored message"); return; } } const messageText = (message.argumentText ?? message.text ?? "").trim(); const attachments = message.attachment ?? []; const hasMedia = attachments.length > 0; const rawBody = messageText || (hasMedia ? "" : ""); if (!rawBody) { return; } const access = await applyGoogleChatInboundAccessPolicy({ account, config, core, space, message, isGroup, senderId, senderName, senderEmail, rawBody, statusSink, logVerbose: (message) => logVerbose(core, runtime, message), }); if (!access.ok) { return; } const { commandAuthorized, effectiveWasMentioned, groupSystemPrompt } = access; const { route, buildEnvelope } = resolveInboundRouteEnvelopeBuilderWithRuntime({ cfg: config, channel: "googlechat", accountId: account.accountId, peer: { kind: isGroup ? ("group" as const) : ("direct" as const), id: spaceId, }, runtime: core.channel, sessionStore: config.session?.store, }); let mediaPath: string | undefined; let mediaType: string | undefined; if (attachments.length > 0) { const first = attachments[0]; const attachmentData = await downloadAttachment(first, account, mediaMaxMb, core); if (attachmentData) { mediaPath = attachmentData.path; mediaType = attachmentData.contentType; } } const fromLabel = isGroup ? space.displayName || `space:${spaceId}` : senderName || `user:${senderId}`; const { storePath, body } = buildEnvelope({ channel: "Google Chat", from: fromLabel, timestamp: event.eventTime ? Date.parse(event.eventTime) : undefined, body: rawBody, }); const ctxPayload = core.channel.reply.finalizeInboundContext({ Body: body, BodyForAgent: rawBody, RawBody: rawBody, CommandBody: rawBody, From: `googlechat:${senderId}`, To: `googlechat:${spaceId}`, SessionKey: route.sessionKey, AccountId: route.accountId, ChatType: isGroup ? "channel" : "direct", ConversationLabel: fromLabel, SenderName: senderName || undefined, SenderId: senderId, SenderUsername: senderEmail, WasMentioned: isGroup ? effectiveWasMentioned : undefined, CommandAuthorized: commandAuthorized, Provider: "googlechat", Surface: "googlechat", MessageSid: message.name, MessageSidFull: message.name, ReplyToId: message.thread?.name, ReplyToIdFull: message.thread?.name, MediaPath: mediaPath, MediaType: mediaType, MediaUrl: mediaPath, GroupSpace: isGroup ? (space.displayName ?? undefined) : undefined, GroupSystemPrompt: isGroup ? groupSystemPrompt : undefined, OriginatingChannel: "googlechat", OriginatingTo: `googlechat:${spaceId}`, }); void core.channel.session .recordSessionMetaFromInbound({ storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey, ctx: ctxPayload, }) .catch((err) => { runtime.error?.(`googlechat: failed updating session meta: ${String(err)}`); }); // Typing indicator setup // Note: Reaction mode requires user OAuth, not available with service account auth. // If reaction is configured, we fall back to message mode with a warning. let typingIndicator = account.config.typingIndicator ?? "message"; if (typingIndicator === "reaction") { runtime.error?.( `[${account.accountId}] typingIndicator="reaction" requires user OAuth (not supported with service account). Falling back to "message" mode.`, ); typingIndicator = "message"; } let typingMessageName: string | undefined; // Start typing indicator (message mode only, reaction mode not supported with app auth) if (typingIndicator === "message") { try { const botName = resolveBotDisplayName({ accountName: account.config.name, agentId: route.agentId, config, }); const result = await sendGoogleChatMessage({ account, space: spaceId, text: `_${botName} is typing..._`, thread: message.thread?.name, }); typingMessageName = result?.messageName; } catch (err) { runtime.error?.(`Failed sending typing message: ${String(err)}`); } } const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ cfg: config, agentId: route.agentId, channel: "googlechat", accountId: route.accountId, }); await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg: config, dispatcherOptions: { ...prefixOptions, deliver: async (payload) => { await deliverGoogleChatReply({ payload, account, spaceId, runtime, core, config, statusSink, typingMessageName, }); // Only use typing message for first delivery typingMessageName = undefined; }, onError: (err, info) => { runtime.error?.( `[${account.accountId}] Google Chat ${info.kind} reply failed: ${String(err)}`, ); }, }, replyOptions: { onModelSelected, }, }); } async function downloadAttachment( attachment: GoogleChatAttachment, account: ResolvedGoogleChatAccount, mediaMaxMb: number, core: GoogleChatCoreRuntime, ): Promise<{ path: string; contentType?: string } | null> { const resourceName = attachment.attachmentDataRef?.resourceName; if (!resourceName) { return null; } const maxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024; const downloaded = await downloadGoogleChatMedia({ account, resourceName, maxBytes }); const saved = await core.channel.media.saveMediaBuffer( downloaded.buffer, downloaded.contentType ?? attachment.contentType, "inbound", maxBytes, attachment.contentName, ); return { path: saved.path, contentType: saved.contentType }; } async function deliverGoogleChatReply(params: { payload: { text?: string; mediaUrls?: string[]; mediaUrl?: string; replyToId?: string }; account: ResolvedGoogleChatAccount; spaceId: string; runtime: GoogleChatRuntimeEnv; core: GoogleChatCoreRuntime; config: OpenClawConfig; statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; typingMessageName?: string; }): Promise { const { payload, account, spaceId, runtime, core, config, statusSink, typingMessageName } = params; const mediaList = payload.mediaUrls?.length ? payload.mediaUrls : payload.mediaUrl ? [payload.mediaUrl] : []; if (mediaList.length > 0) { let suppressCaption = false; if (typingMessageName) { try { await deleteGoogleChatMessage({ account, messageName: typingMessageName, }); } catch (err) { runtime.error?.(`Google Chat typing cleanup failed: ${String(err)}`); const fallbackText = payload.text?.trim() ? payload.text : mediaList.length > 1 ? "Sent attachments." : "Sent attachment."; try { await updateGoogleChatMessage({ account, messageName: typingMessageName, text: fallbackText, }); suppressCaption = Boolean(payload.text?.trim()); } catch (updateErr) { runtime.error?.(`Google Chat typing update failed: ${String(updateErr)}`); } } } let first = true; for (const mediaUrl of mediaList) { const caption = first && !suppressCaption ? payload.text : undefined; first = false; try { const loaded = await core.channel.media.fetchRemoteMedia({ url: mediaUrl, maxBytes: (account.config.mediaMaxMb ?? 20) * 1024 * 1024, }); const upload = await uploadAttachmentForReply({ account, spaceId, buffer: loaded.buffer, contentType: loaded.contentType, filename: loaded.fileName ?? "attachment", }); if (!upload.attachmentUploadToken) { throw new Error("missing attachment upload token"); } await sendGoogleChatMessage({ account, space: spaceId, text: caption, thread: payload.replyToId, attachments: [ { attachmentUploadToken: upload.attachmentUploadToken, contentName: loaded.fileName }, ], }); statusSink?.({ lastOutboundAt: Date.now() }); } catch (err) { runtime.error?.(`Google Chat attachment send failed: ${String(err)}`); } } return; } if (payload.text) { const chunkLimit = account.config.textChunkLimit ?? 4000; const chunkMode = core.channel.text.resolveChunkMode(config, "googlechat", account.accountId); const chunks = core.channel.text.chunkMarkdownTextWithMode(payload.text, chunkLimit, chunkMode); for (let i = 0; i < chunks.length; i++) { const chunk = chunks[i]; try { // Edit typing message with first chunk if available if (i === 0 && typingMessageName) { await updateGoogleChatMessage({ account, messageName: typingMessageName, text: chunk, }); } else { await sendGoogleChatMessage({ account, space: spaceId, text: chunk, thread: payload.replyToId, }); } statusSink?.({ lastOutboundAt: Date.now() }); } catch (err) { runtime.error?.(`Google Chat message send failed: ${String(err)}`); } } } } async function uploadAttachmentForReply(params: { account: ResolvedGoogleChatAccount; spaceId: string; buffer: Buffer; contentType?: string; filename: string; }) { const { account, spaceId, buffer, contentType, filename } = params; const { uploadGoogleChatAttachment } = await import("./api.js"); return await uploadGoogleChatAttachment({ account, space: spaceId, filename, buffer, contentType, }); } export function monitorGoogleChatProvider(options: GoogleChatMonitorOptions): () => void { const core = getGoogleChatRuntime(); const webhookPath = resolveWebhookPath({ webhookPath: options.webhookPath, webhookUrl: options.webhookUrl, defaultPath: "/googlechat", }); if (!webhookPath) { options.runtime.error?.(`[${options.account.accountId}] invalid webhook path`); return () => {}; } const audienceType = normalizeAudienceType(options.account.config.audienceType); const audience = options.account.config.audience?.trim(); const mediaMaxMb = options.account.config.mediaMaxMb ?? 20; const unregisterTarget = registerGoogleChatWebhookTarget({ account: options.account, config: options.config, runtime: options.runtime, core, path: webhookPath, audienceType, audience, statusSink: options.statusSink, mediaMaxMb, }); return () => { unregisterTarget(); }; } export async function startGoogleChatMonitor( params: GoogleChatMonitorOptions, ): Promise<() => void> { return monitorGoogleChatProvider(params); } export function resolveGoogleChatWebhookPath(params: { account: ResolvedGoogleChatAccount; }): string { return ( resolveWebhookPath({ webhookPath: params.account.config.webhookPath, webhookUrl: params.account.config.webhookUrl, defaultPath: "/googlechat", }) ?? "/googlechat" ); } export function computeGoogleChatMediaMaxMb(params: { account: ResolvedGoogleChatAccount }) { return params.account.config.mediaMaxMb ?? 20; }