Files
Moltbot/extensions/zalo/src/monitor.ts
2026-03-04 02:35:13 -05:00

696 lines
18 KiB
TypeScript

import type { IncomingMessage, ServerResponse } from "node:http";
import type {
MarkdownTableMode,
OpenClawConfig,
OutboundReplyPayload,
} from "openclaw/plugin-sdk/zalo";
import {
createScopedPairingAccess,
createReplyPrefixOptions,
resolveDirectDmAuthorizationOutcome,
resolveSenderCommandAuthorizationWithRuntime,
resolveOutboundMediaUrls,
resolveDefaultGroupPolicy,
resolveInboundRouteEnvelopeBuilderWithRuntime,
sendMediaWithLeadingCaption,
resolveWebhookPath,
warnMissingProviderGroupPolicyFallbackOnce,
} from "openclaw/plugin-sdk/zalo";
import type { ResolvedZaloAccount } from "./accounts.js";
import {
ZaloApiError,
deleteWebhook,
getUpdates,
sendMessage,
sendPhoto,
setWebhook,
type ZaloFetch,
type ZaloMessage,
type ZaloUpdate,
} from "./api.js";
import {
evaluateZaloGroupAccess,
isZaloSenderAllowed,
resolveZaloRuntimeGroupPolicy,
} from "./group-access.js";
import {
clearZaloWebhookSecurityStateForTest,
getZaloWebhookRateLimitStateSizeForTest,
getZaloWebhookStatusCounterSizeForTest,
handleZaloWebhookRequest as handleZaloWebhookRequestInternal,
registerZaloWebhookTarget as registerZaloWebhookTargetInternal,
type ZaloWebhookTarget,
} from "./monitor.webhook.js";
import { resolveZaloProxyFetch } from "./proxy.js";
import { getZaloRuntime } from "./runtime.js";
export type ZaloRuntimeEnv = {
log?: (message: string) => void;
error?: (message: string) => void;
};
export type ZaloMonitorOptions = {
token: string;
account: ResolvedZaloAccount;
config: OpenClawConfig;
runtime: ZaloRuntimeEnv;
abortSignal: AbortSignal;
useWebhook?: boolean;
webhookUrl?: string;
webhookSecret?: string;
webhookPath?: string;
fetcher?: ZaloFetch;
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
};
export type ZaloMonitorResult = {
stop: () => void;
};
const ZALO_TEXT_LIMIT = 2000;
const DEFAULT_MEDIA_MAX_MB = 5;
type ZaloCoreRuntime = ReturnType<typeof getZaloRuntime>;
function logVerbose(core: ZaloCoreRuntime, runtime: ZaloRuntimeEnv, message: string): void {
if (core.logging.shouldLogVerbose()) {
runtime.log?.(`[zalo] ${message}`);
}
}
export function registerZaloWebhookTarget(target: ZaloWebhookTarget): () => void {
return registerZaloWebhookTargetInternal(target, {
route: {
auth: "plugin",
match: "exact",
pluginId: "zalo",
source: "zalo-webhook",
accountId: target.account.accountId,
log: target.runtime.log,
handler: async (req, res) => {
const handled = await handleZaloWebhookRequest(req, res);
if (!handled && !res.headersSent) {
res.statusCode = 404;
res.setHeader("Content-Type", "text/plain; charset=utf-8");
res.end("Not Found");
}
},
},
});
}
export {
clearZaloWebhookSecurityStateForTest,
getZaloWebhookRateLimitStateSizeForTest,
getZaloWebhookStatusCounterSizeForTest,
};
export async function handleZaloWebhookRequest(
req: IncomingMessage,
res: ServerResponse,
): Promise<boolean> {
return handleZaloWebhookRequestInternal(req, res, async ({ update, target }) => {
await processUpdate(
update,
target.token,
target.account,
target.config,
target.runtime,
target.core as ZaloCoreRuntime,
target.mediaMaxMb,
target.statusSink,
target.fetcher,
);
});
}
function startPollingLoop(params: {
token: string;
account: ResolvedZaloAccount;
config: OpenClawConfig;
runtime: ZaloRuntimeEnv;
core: ZaloCoreRuntime;
abortSignal: AbortSignal;
isStopped: () => boolean;
mediaMaxMb: number;
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
fetcher?: ZaloFetch;
}) {
const {
token,
account,
config,
runtime,
core,
abortSignal,
isStopped,
mediaMaxMb,
statusSink,
fetcher,
} = params;
const pollTimeout = 30;
const poll = async () => {
if (isStopped() || abortSignal.aborted) {
return;
}
try {
const response = await getUpdates(token, { timeout: pollTimeout }, fetcher);
if (response.ok && response.result) {
statusSink?.({ lastInboundAt: Date.now() });
await processUpdate(
response.result,
token,
account,
config,
runtime,
core,
mediaMaxMb,
statusSink,
fetcher,
);
}
} catch (err) {
if (err instanceof ZaloApiError && err.isPollingTimeout) {
// no updates
} else if (!isStopped() && !abortSignal.aborted) {
runtime.error?.(`[${account.accountId}] Zalo polling error: ${String(err)}`);
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
if (!isStopped() && !abortSignal.aborted) {
setImmediate(poll);
}
};
void poll();
}
async function processUpdate(
update: ZaloUpdate,
token: string,
account: ResolvedZaloAccount,
config: OpenClawConfig,
runtime: ZaloRuntimeEnv,
core: ZaloCoreRuntime,
mediaMaxMb: number,
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void,
fetcher?: ZaloFetch,
): Promise<void> {
const { event_name, message } = update;
if (!message) {
return;
}
switch (event_name) {
case "message.text.received":
await handleTextMessage(message, token, account, config, runtime, core, statusSink, fetcher);
break;
case "message.image.received":
await handleImageMessage(
message,
token,
account,
config,
runtime,
core,
mediaMaxMb,
statusSink,
fetcher,
);
break;
case "message.sticker.received":
logVerbose(core, runtime, `[${account.accountId}] Received sticker from ${message.from.id}`);
break;
case "message.unsupported.received":
logVerbose(
core,
runtime,
`[${account.accountId}] Received unsupported message type from ${message.from.id}`,
);
break;
}
}
async function handleTextMessage(
message: ZaloMessage,
token: string,
account: ResolvedZaloAccount,
config: OpenClawConfig,
runtime: ZaloRuntimeEnv,
core: ZaloCoreRuntime,
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void,
fetcher?: ZaloFetch,
): Promise<void> {
const { text } = message;
if (!text?.trim()) {
return;
}
await processMessageWithPipeline({
message,
token,
account,
config,
runtime,
core,
text,
mediaPath: undefined,
mediaType: undefined,
statusSink,
fetcher,
});
}
async function handleImageMessage(
message: ZaloMessage,
token: string,
account: ResolvedZaloAccount,
config: OpenClawConfig,
runtime: ZaloRuntimeEnv,
core: ZaloCoreRuntime,
mediaMaxMb: number,
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void,
fetcher?: ZaloFetch,
): Promise<void> {
const { photo, caption } = message;
let mediaPath: string | undefined;
let mediaType: string | undefined;
if (photo) {
try {
const maxBytes = mediaMaxMb * 1024 * 1024;
const fetched = await core.channel.media.fetchRemoteMedia({ url: photo, maxBytes });
const saved = await core.channel.media.saveMediaBuffer(
fetched.buffer,
fetched.contentType,
"inbound",
maxBytes,
);
mediaPath = saved.path;
mediaType = saved.contentType;
} catch (err) {
runtime.error?.(`[${account.accountId}] Failed to download Zalo image: ${String(err)}`);
}
}
await processMessageWithPipeline({
message,
token,
account,
config,
runtime,
core,
text: caption,
mediaPath,
mediaType,
statusSink,
fetcher,
});
}
async function processMessageWithPipeline(params: {
message: ZaloMessage;
token: string;
account: ResolvedZaloAccount;
config: OpenClawConfig;
runtime: ZaloRuntimeEnv;
core: ZaloCoreRuntime;
text?: string;
mediaPath?: string;
mediaType?: string;
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
fetcher?: ZaloFetch;
}): Promise<void> {
const {
message,
token,
account,
config,
runtime,
core,
text,
mediaPath,
mediaType,
statusSink,
fetcher,
} = params;
const pairing = createScopedPairingAccess({
core,
channel: "zalo",
accountId: account.accountId,
});
const { from, chat, message_id, date } = message;
const isGroup = chat.chat_type === "GROUP";
const chatId = chat.id;
const senderId = from.id;
const senderName = from.name;
const dmPolicy = account.config.dmPolicy ?? "pairing";
const configAllowFrom = (account.config.allowFrom ?? []).map((v) => String(v));
const configuredGroupAllowFrom = (account.config.groupAllowFrom ?? []).map((v) => String(v));
const groupAllowFrom =
configuredGroupAllowFrom.length > 0 ? configuredGroupAllowFrom : configAllowFrom;
const defaultGroupPolicy = resolveDefaultGroupPolicy(config);
const groupAccess = isGroup
? evaluateZaloGroupAccess({
providerConfigPresent: config.channels?.zalo !== undefined,
configuredGroupPolicy: account.config.groupPolicy,
defaultGroupPolicy,
groupAllowFrom,
senderId,
})
: undefined;
if (groupAccess) {
warnMissingProviderGroupPolicyFallbackOnce({
providerMissingFallbackApplied: groupAccess.providerMissingFallbackApplied,
providerKey: "zalo",
accountId: account.accountId,
log: (message) => logVerbose(core, runtime, message),
});
if (!groupAccess.allowed) {
if (groupAccess.reason === "disabled") {
logVerbose(core, runtime, `zalo: drop group ${chatId} (groupPolicy=disabled)`);
} else if (groupAccess.reason === "empty_allowlist") {
logVerbose(
core,
runtime,
`zalo: drop group ${chatId} (groupPolicy=allowlist, no groupAllowFrom)`,
);
} else if (groupAccess.reason === "sender_not_allowlisted") {
logVerbose(core, runtime, `zalo: drop group sender ${senderId} (groupPolicy=allowlist)`);
}
return;
}
}
const rawBody = text?.trim() || (mediaPath ? "<media:image>" : "");
const { senderAllowedForCommands, commandAuthorized } =
await resolveSenderCommandAuthorizationWithRuntime({
cfg: config,
rawBody,
isGroup,
dmPolicy,
configuredAllowFrom: configAllowFrom,
configuredGroupAllowFrom: groupAllowFrom,
senderId,
isSenderAllowed: isZaloSenderAllowed,
readAllowFromStore: pairing.readAllowFromStore,
runtime: core.channel.commands,
});
const directDmOutcome = resolveDirectDmAuthorizationOutcome({
isGroup,
dmPolicy,
senderAllowedForCommands,
});
if (directDmOutcome === "disabled") {
logVerbose(core, runtime, `Blocked zalo DM from ${senderId} (dmPolicy=disabled)`);
return;
}
if (directDmOutcome === "unauthorized") {
if (dmPolicy === "pairing") {
const { code, created } = await pairing.upsertPairingRequest({
id: senderId,
meta: { name: senderName ?? undefined },
});
if (created) {
logVerbose(core, runtime, `zalo pairing request sender=${senderId}`);
try {
await sendMessage(
token,
{
chat_id: chatId,
text: core.channel.pairing.buildPairingReply({
channel: "zalo",
idLine: `Your Zalo user id: ${senderId}`,
code,
}),
},
fetcher,
);
statusSink?.({ lastOutboundAt: Date.now() });
} catch (err) {
logVerbose(core, runtime, `zalo pairing reply failed for ${senderId}: ${String(err)}`);
}
}
} else {
logVerbose(
core,
runtime,
`Blocked unauthorized zalo sender ${senderId} (dmPolicy=${dmPolicy})`,
);
}
return;
}
const { route, buildEnvelope } = resolveInboundRouteEnvelopeBuilderWithRuntime({
cfg: config,
channel: "zalo",
accountId: account.accountId,
peer: {
kind: isGroup ? ("group" as const) : ("direct" as const),
id: chatId,
},
runtime: core.channel,
sessionStore: config.session?.store,
});
if (
isGroup &&
core.channel.commands.isControlCommandMessage(rawBody, config) &&
commandAuthorized !== true
) {
logVerbose(core, runtime, `zalo: drop control command from unauthorized sender ${senderId}`);
return;
}
const fromLabel = isGroup ? `group:${chatId}` : senderName || `user:${senderId}`;
const { storePath, body } = buildEnvelope({
channel: "Zalo",
from: fromLabel,
timestamp: date ? date * 1000 : undefined,
body: rawBody,
});
const ctxPayload = core.channel.reply.finalizeInboundContext({
Body: body,
BodyForAgent: rawBody,
RawBody: rawBody,
CommandBody: rawBody,
From: isGroup ? `zalo:group:${chatId}` : `zalo:${senderId}`,
To: `zalo:${chatId}`,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: isGroup ? "group" : "direct",
ConversationLabel: fromLabel,
SenderName: senderName || undefined,
SenderId: senderId,
CommandAuthorized: commandAuthorized,
Provider: "zalo",
Surface: "zalo",
MessageSid: message_id,
MediaPath: mediaPath,
MediaType: mediaType,
MediaUrl: mediaPath,
OriginatingChannel: "zalo",
OriginatingTo: `zalo:${chatId}`,
});
await core.channel.session.recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
onRecordError: (err) => {
runtime.error?.(`zalo: failed updating session meta: ${String(err)}`);
},
});
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg: config,
channel: "zalo",
accountId: account.accountId,
});
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
cfg: config,
agentId: route.agentId,
channel: "zalo",
accountId: account.accountId,
});
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: config,
dispatcherOptions: {
...prefixOptions,
deliver: async (payload) => {
await deliverZaloReply({
payload,
token,
chatId,
runtime,
core,
config,
accountId: account.accountId,
statusSink,
fetcher,
tableMode,
});
},
onError: (err, info) => {
runtime.error?.(`[${account.accountId}] Zalo ${info.kind} reply failed: ${String(err)}`);
},
},
replyOptions: {
onModelSelected,
},
});
}
async function deliverZaloReply(params: {
payload: OutboundReplyPayload;
token: string;
chatId: string;
runtime: ZaloRuntimeEnv;
core: ZaloCoreRuntime;
config: OpenClawConfig;
accountId?: string;
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
fetcher?: ZaloFetch;
tableMode?: MarkdownTableMode;
}): Promise<void> {
const { payload, token, chatId, runtime, core, config, accountId, statusSink, fetcher } = params;
const tableMode = params.tableMode ?? "code";
const text = core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode);
const sentMedia = await sendMediaWithLeadingCaption({
mediaUrls: resolveOutboundMediaUrls(payload),
caption: text,
send: async ({ mediaUrl, caption }) => {
await sendPhoto(token, { chat_id: chatId, photo: mediaUrl, caption }, fetcher);
statusSink?.({ lastOutboundAt: Date.now() });
},
onError: (error) => {
runtime.error?.(`Zalo photo send failed: ${String(error)}`);
},
});
if (sentMedia) {
return;
}
if (text) {
const chunkMode = core.channel.text.resolveChunkMode(config, "zalo", accountId);
const chunks = core.channel.text.chunkMarkdownTextWithMode(text, ZALO_TEXT_LIMIT, chunkMode);
for (const chunk of chunks) {
try {
await sendMessage(token, { chat_id: chatId, text: chunk }, fetcher);
statusSink?.({ lastOutboundAt: Date.now() });
} catch (err) {
runtime.error?.(`Zalo message send failed: ${String(err)}`);
}
}
}
}
export async function monitorZaloProvider(options: ZaloMonitorOptions): Promise<ZaloMonitorResult> {
const {
token,
account,
config,
runtime,
abortSignal,
useWebhook,
webhookUrl,
webhookSecret,
webhookPath,
statusSink,
fetcher: fetcherOverride,
} = options;
const core = getZaloRuntime();
const effectiveMediaMaxMb = account.config.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB;
const fetcher = fetcherOverride ?? resolveZaloProxyFetch(account.config.proxy);
let stopped = false;
const stopHandlers: Array<() => void> = [];
const stop = () => {
stopped = true;
for (const handler of stopHandlers) {
handler();
}
};
if (useWebhook) {
if (!webhookUrl || !webhookSecret) {
throw new Error("Zalo webhookUrl and webhookSecret are required for webhook mode");
}
if (!webhookUrl.startsWith("https://")) {
throw new Error("Zalo webhook URL must use HTTPS");
}
if (webhookSecret.length < 8 || webhookSecret.length > 256) {
throw new Error("Zalo webhook secret must be 8-256 characters");
}
const path = resolveWebhookPath({ webhookPath, webhookUrl, defaultPath: null });
if (!path) {
throw new Error("Zalo webhookPath could not be derived");
}
await setWebhook(token, { url: webhookUrl, secret_token: webhookSecret }, fetcher);
const unregister = registerZaloWebhookTarget({
token,
account,
config,
runtime,
core,
path,
secret: webhookSecret,
statusSink: (patch) => statusSink?.(patch),
mediaMaxMb: effectiveMediaMaxMb,
fetcher,
});
stopHandlers.push(unregister);
abortSignal.addEventListener(
"abort",
() => {
void deleteWebhook(token, fetcher).catch(() => {});
},
{ once: true },
);
return { stop };
}
try {
await deleteWebhook(token, fetcher);
} catch {
// ignore
}
startPollingLoop({
token,
account,
config,
runtime,
core,
abortSignal,
isStopped: () => stopped,
mediaMaxMb: effectiveMediaMaxMb,
statusSink,
fetcher,
});
return { stop };
}
export const __testing = {
evaluateZaloGroupAccess,
resolveZaloRuntimeGroupPolicy,
};