Files
Moltbot/src/gateway/server-http.ts
2026-02-22 08:40:49 +01:00

696 lines
22 KiB
TypeScript

import {
createServer as createHttpServer,
type Server as HttpServer,
type IncomingMessage,
type ServerResponse,
} from "node:http";
import { createServer as createHttpsServer } from "node:https";
import type { TlsOptions } from "node:tls";
import type { WebSocketServer } from "ws";
import { resolveAgentAvatar } from "../agents/identity-avatar.js";
import {
A2UI_PATH,
CANVAS_HOST_PATH,
CANVAS_WS_PATH,
handleA2uiHttpRequest,
} from "../canvas-host/a2ui.js";
import type { CanvasHostHandler } from "../canvas-host/server.js";
import { loadConfig } from "../config/config.js";
import type { createSubsystemLogger } from "../logging/subsystem.js";
import { safeEqualSecret } from "../security/secret-equal.js";
import { handleSlackHttpRequest } from "../slack/http/index.js";
import { normalizeRateLimitClientIp, type AuthRateLimiter } from "./auth-rate-limit.js";
import {
authorizeHttpGatewayConnect,
isLocalDirectRequest,
type GatewayAuthResult,
type ResolvedGatewayAuth,
} from "./auth.js";
import { CANVAS_CAPABILITY_TTL_MS, normalizeCanvasScopedUrl } from "./canvas-capability.js";
import {
handleControlUiAvatarRequest,
handleControlUiHttpRequest,
type ControlUiRootState,
} from "./control-ui.js";
import { applyHookMappings } from "./hooks-mapping.js";
import {
extractHookToken,
getHookAgentPolicyError,
getHookChannelError,
type HookMessageChannel,
type HooksConfigResolved,
isHookAgentAllowed,
normalizeAgentPayload,
normalizeHookHeaders,
normalizeWakePayload,
readJsonBody,
resolveHookSessionKey,
resolveHookTargetAgentId,
resolveHookChannel,
resolveHookDeliver,
} from "./hooks.js";
import { sendGatewayAuthFailure, setDefaultSecurityHeaders } from "./http-common.js";
import { getBearerToken } from "./http-utils.js";
import { handleOpenAiHttpRequest } from "./openai-http.js";
import { handleOpenResponsesHttpRequest } from "./openresponses-http.js";
import { GATEWAY_CLIENT_MODES, normalizeGatewayClientMode } from "./protocol/client-info.js";
import type { GatewayWsClient } from "./server/ws-types.js";
import { handleToolsInvokeHttpRequest } from "./tools-invoke-http.js";
type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
type HookAuthFailure = { count: number; windowStartedAtMs: number };
const HOOK_AUTH_FAILURE_LIMIT = 20;
const HOOK_AUTH_FAILURE_WINDOW_MS = 60_000;
const HOOK_AUTH_FAILURE_TRACK_MAX = 2048;
type HookDispatchers = {
dispatchWakeHook: (value: { text: string; mode: "now" | "next-heartbeat" }) => void;
dispatchAgentHook: (value: {
message: string;
name: string;
agentId?: string;
wakeMode: "now" | "next-heartbeat";
sessionKey: string;
deliver: boolean;
channel: HookMessageChannel;
to?: string;
model?: string;
thinking?: string;
timeoutSeconds?: number;
allowUnsafeExternalContent?: boolean;
}) => string;
};
function sendJson(res: ServerResponse, status: number, body: unknown) {
res.statusCode = status;
res.setHeader("Content-Type", "application/json; charset=utf-8");
res.end(JSON.stringify(body));
}
function isCanvasPath(pathname: string): boolean {
return (
pathname === A2UI_PATH ||
pathname.startsWith(`${A2UI_PATH}/`) ||
pathname === CANVAS_HOST_PATH ||
pathname.startsWith(`${CANVAS_HOST_PATH}/`) ||
pathname === CANVAS_WS_PATH
);
}
function isNodeWsClient(client: GatewayWsClient): boolean {
if (client.connect.role === "node") {
return true;
}
return normalizeGatewayClientMode(client.connect.client.mode) === GATEWAY_CLIENT_MODES.NODE;
}
function hasAuthorizedNodeWsClientForCanvasCapability(
clients: Set<GatewayWsClient>,
capability: string,
): boolean {
const nowMs = Date.now();
for (const client of clients) {
if (!isNodeWsClient(client)) {
continue;
}
if (!client.canvasCapability || !client.canvasCapabilityExpiresAtMs) {
continue;
}
if (client.canvasCapabilityExpiresAtMs <= nowMs) {
continue;
}
if (safeEqualSecret(client.canvasCapability, capability)) {
// Sliding expiration while the connected node keeps using canvas.
client.canvasCapabilityExpiresAtMs = nowMs + CANVAS_CAPABILITY_TTL_MS;
return true;
}
}
return false;
}
async function authorizeCanvasRequest(params: {
req: IncomingMessage;
auth: ResolvedGatewayAuth;
trustedProxies: string[];
allowRealIpFallback: boolean;
clients: Set<GatewayWsClient>;
canvasCapability?: string;
malformedScopedPath?: boolean;
rateLimiter?: AuthRateLimiter;
}): Promise<GatewayAuthResult> {
const {
req,
auth,
trustedProxies,
allowRealIpFallback,
clients,
canvasCapability,
malformedScopedPath,
rateLimiter,
} = params;
if (malformedScopedPath) {
return { ok: false, reason: "unauthorized" };
}
if (isLocalDirectRequest(req, trustedProxies, allowRealIpFallback)) {
return { ok: true };
}
let lastAuthFailure: GatewayAuthResult | null = null;
const token = getBearerToken(req);
if (token) {
const authResult = await authorizeHttpGatewayConnect({
auth: { ...auth, allowTailscale: false },
connectAuth: { token, password: token },
req,
trustedProxies,
allowRealIpFallback,
rateLimiter,
});
if (authResult.ok) {
return authResult;
}
lastAuthFailure = authResult;
}
if (canvasCapability && hasAuthorizedNodeWsClientForCanvasCapability(clients, canvasCapability)) {
return { ok: true };
}
return lastAuthFailure ?? { ok: false, reason: "unauthorized" };
}
function writeUpgradeAuthFailure(
socket: { write: (chunk: string) => void },
auth: GatewayAuthResult,
) {
if (auth.rateLimited) {
const retryAfterSeconds =
auth.retryAfterMs && auth.retryAfterMs > 0 ? Math.ceil(auth.retryAfterMs / 1000) : undefined;
socket.write(
[
"HTTP/1.1 429 Too Many Requests",
retryAfterSeconds ? `Retry-After: ${retryAfterSeconds}` : undefined,
"Content-Type: application/json; charset=utf-8",
"Connection: close",
"",
JSON.stringify({
error: {
message: "Too many failed authentication attempts. Please try again later.",
type: "rate_limited",
},
}),
]
.filter(Boolean)
.join("\r\n"),
);
return;
}
socket.write("HTTP/1.1 401 Unauthorized\r\nConnection: close\r\n\r\n");
}
export type HooksRequestHandler = (req: IncomingMessage, res: ServerResponse) => Promise<boolean>;
export function createHooksRequestHandler(
opts: {
getHooksConfig: () => HooksConfigResolved | null;
bindHost: string;
port: number;
logHooks: SubsystemLogger;
} & HookDispatchers,
): HooksRequestHandler {
const { getHooksConfig, bindHost, port, logHooks, dispatchAgentHook, dispatchWakeHook } = opts;
const hookAuthFailures = new Map<string, HookAuthFailure>();
const resolveHookClientKey = (req: IncomingMessage): string => {
return normalizeRateLimitClientIp(req.socket?.remoteAddress);
};
const recordHookAuthFailure = (
clientKey: string,
nowMs: number,
): { throttled: boolean; retryAfterSeconds?: number } => {
if (!hookAuthFailures.has(clientKey) && hookAuthFailures.size >= HOOK_AUTH_FAILURE_TRACK_MAX) {
// Prune expired entries instead of clearing all state.
for (const [key, entry] of hookAuthFailures) {
if (nowMs - entry.windowStartedAtMs >= HOOK_AUTH_FAILURE_WINDOW_MS) {
hookAuthFailures.delete(key);
}
}
// If still at capacity after pruning, drop the oldest half.
if (hookAuthFailures.size >= HOOK_AUTH_FAILURE_TRACK_MAX) {
let toRemove = Math.floor(hookAuthFailures.size / 2);
for (const key of hookAuthFailures.keys()) {
if (toRemove <= 0) {
break;
}
hookAuthFailures.delete(key);
toRemove--;
}
}
}
const current = hookAuthFailures.get(clientKey);
const expired = !current || nowMs - current.windowStartedAtMs >= HOOK_AUTH_FAILURE_WINDOW_MS;
const next: HookAuthFailure = expired
? { count: 1, windowStartedAtMs: nowMs }
: { count: current.count + 1, windowStartedAtMs: current.windowStartedAtMs };
// Delete-before-set refreshes Map insertion order so recently-active
// clients are not evicted before dormant ones during oldest-half eviction.
if (hookAuthFailures.has(clientKey)) {
hookAuthFailures.delete(clientKey);
}
hookAuthFailures.set(clientKey, next);
if (next.count <= HOOK_AUTH_FAILURE_LIMIT) {
return { throttled: false };
}
const retryAfterMs = Math.max(1, next.windowStartedAtMs + HOOK_AUTH_FAILURE_WINDOW_MS - nowMs);
return {
throttled: true,
retryAfterSeconds: Math.ceil(retryAfterMs / 1000),
};
};
const clearHookAuthFailure = (clientKey: string) => {
hookAuthFailures.delete(clientKey);
};
return async (req, res) => {
const hooksConfig = getHooksConfig();
if (!hooksConfig) {
return false;
}
const url = new URL(req.url ?? "/", `http://${bindHost}:${port}`);
const basePath = hooksConfig.basePath;
if (url.pathname !== basePath && !url.pathname.startsWith(`${basePath}/`)) {
return false;
}
if (url.searchParams.has("token")) {
res.statusCode = 400;
res.setHeader("Content-Type", "text/plain; charset=utf-8");
res.end(
"Hook token must be provided via Authorization: Bearer <token> or X-OpenClaw-Token header (query parameters are not allowed).",
);
return true;
}
const token = extractHookToken(req);
const clientKey = resolveHookClientKey(req);
if (!safeEqualSecret(token, hooksConfig.token)) {
const throttle = recordHookAuthFailure(clientKey, Date.now());
if (throttle.throttled) {
const retryAfter = throttle.retryAfterSeconds ?? 1;
res.statusCode = 429;
res.setHeader("Retry-After", String(retryAfter));
res.setHeader("Content-Type", "text/plain; charset=utf-8");
res.end("Too Many Requests");
logHooks.warn(`hook auth throttled for ${clientKey}; retry-after=${retryAfter}s`);
return true;
}
res.statusCode = 401;
res.setHeader("Content-Type", "text/plain; charset=utf-8");
res.end("Unauthorized");
return true;
}
clearHookAuthFailure(clientKey);
if (req.method !== "POST") {
res.statusCode = 405;
res.setHeader("Allow", "POST");
res.setHeader("Content-Type", "text/plain; charset=utf-8");
res.end("Method Not Allowed");
return true;
}
const subPath = url.pathname.slice(basePath.length).replace(/^\/+/, "");
if (!subPath) {
res.statusCode = 404;
res.setHeader("Content-Type", "text/plain; charset=utf-8");
res.end("Not Found");
return true;
}
const body = await readJsonBody(req, hooksConfig.maxBodyBytes);
if (!body.ok) {
const status =
body.error === "payload too large"
? 413
: body.error === "request body timeout"
? 408
: 400;
sendJson(res, status, { ok: false, error: body.error });
return true;
}
const payload = typeof body.value === "object" && body.value !== null ? body.value : {};
const headers = normalizeHookHeaders(req);
if (subPath === "wake") {
const normalized = normalizeWakePayload(payload as Record<string, unknown>);
if (!normalized.ok) {
sendJson(res, 400, { ok: false, error: normalized.error });
return true;
}
dispatchWakeHook(normalized.value);
sendJson(res, 200, { ok: true, mode: normalized.value.mode });
return true;
}
if (subPath === "agent") {
const normalized = normalizeAgentPayload(payload as Record<string, unknown>);
if (!normalized.ok) {
sendJson(res, 400, { ok: false, error: normalized.error });
return true;
}
if (!isHookAgentAllowed(hooksConfig, normalized.value.agentId)) {
sendJson(res, 400, { ok: false, error: getHookAgentPolicyError() });
return true;
}
const sessionKey = resolveHookSessionKey({
hooksConfig,
source: "request",
sessionKey: normalized.value.sessionKey,
});
if (!sessionKey.ok) {
sendJson(res, 400, { ok: false, error: sessionKey.error });
return true;
}
const runId = dispatchAgentHook({
...normalized.value,
sessionKey: sessionKey.value,
agentId: resolveHookTargetAgentId(hooksConfig, normalized.value.agentId),
});
sendJson(res, 202, { ok: true, runId });
return true;
}
if (hooksConfig.mappings.length > 0) {
try {
const mapped = await applyHookMappings(hooksConfig.mappings, {
payload: payload as Record<string, unknown>,
headers,
url,
path: subPath,
});
if (mapped) {
if (!mapped.ok) {
sendJson(res, 400, { ok: false, error: mapped.error });
return true;
}
if (mapped.action === null) {
res.statusCode = 204;
res.end();
return true;
}
if (mapped.action.kind === "wake") {
dispatchWakeHook({
text: mapped.action.text,
mode: mapped.action.mode,
});
sendJson(res, 200, { ok: true, mode: mapped.action.mode });
return true;
}
const channel = resolveHookChannel(mapped.action.channel);
if (!channel) {
sendJson(res, 400, { ok: false, error: getHookChannelError() });
return true;
}
if (!isHookAgentAllowed(hooksConfig, mapped.action.agentId)) {
sendJson(res, 400, { ok: false, error: getHookAgentPolicyError() });
return true;
}
const sessionKey = resolveHookSessionKey({
hooksConfig,
source: "mapping",
sessionKey: mapped.action.sessionKey,
});
if (!sessionKey.ok) {
sendJson(res, 400, { ok: false, error: sessionKey.error });
return true;
}
const runId = dispatchAgentHook({
message: mapped.action.message,
name: mapped.action.name ?? "Hook",
agentId: resolveHookTargetAgentId(hooksConfig, mapped.action.agentId),
wakeMode: mapped.action.wakeMode,
sessionKey: sessionKey.value,
deliver: resolveHookDeliver(mapped.action.deliver),
channel,
to: mapped.action.to,
model: mapped.action.model,
thinking: mapped.action.thinking,
timeoutSeconds: mapped.action.timeoutSeconds,
allowUnsafeExternalContent: mapped.action.allowUnsafeExternalContent,
});
sendJson(res, 202, { ok: true, runId });
return true;
}
} catch (err) {
logHooks.warn(`hook mapping failed: ${String(err)}`);
sendJson(res, 500, { ok: false, error: "hook mapping failed" });
return true;
}
}
res.statusCode = 404;
res.setHeader("Content-Type", "text/plain; charset=utf-8");
res.end("Not Found");
return true;
};
}
export function createGatewayHttpServer(opts: {
canvasHost: CanvasHostHandler | null;
clients: Set<GatewayWsClient>;
controlUiEnabled: boolean;
controlUiBasePath: string;
controlUiRoot?: ControlUiRootState;
openAiChatCompletionsEnabled: boolean;
openResponsesEnabled: boolean;
openResponsesConfig?: import("../config/types.gateway.js").GatewayHttpResponsesConfig;
handleHooksRequest: HooksRequestHandler;
handlePluginRequest?: HooksRequestHandler;
resolvedAuth: ResolvedGatewayAuth;
/** Optional rate limiter for auth brute-force protection. */
rateLimiter?: AuthRateLimiter;
tlsOptions?: TlsOptions;
}): HttpServer {
const {
canvasHost,
clients,
controlUiEnabled,
controlUiBasePath,
controlUiRoot,
openAiChatCompletionsEnabled,
openResponsesEnabled,
openResponsesConfig,
handleHooksRequest,
handlePluginRequest,
resolvedAuth,
rateLimiter,
} = opts;
const httpServer: HttpServer = opts.tlsOptions
? createHttpsServer(opts.tlsOptions, (req, res) => {
void handleRequest(req, res);
})
: createHttpServer((req, res) => {
void handleRequest(req, res);
});
async function handleRequest(req: IncomingMessage, res: ServerResponse) {
setDefaultSecurityHeaders(res);
// Don't interfere with WebSocket upgrades; ws handles the 'upgrade' event.
if (String(req.headers.upgrade ?? "").toLowerCase() === "websocket") {
return;
}
try {
const configSnapshot = loadConfig();
const trustedProxies = configSnapshot.gateway?.trustedProxies ?? [];
const allowRealIpFallback = configSnapshot.gateway?.allowRealIpFallback === true;
const scopedCanvas = normalizeCanvasScopedUrl(req.url ?? "/");
if (scopedCanvas.malformedScopedPath) {
sendGatewayAuthFailure(res, { ok: false, reason: "unauthorized" });
return;
}
if (scopedCanvas.rewrittenUrl) {
req.url = scopedCanvas.rewrittenUrl;
}
const requestPath = new URL(req.url ?? "/", "http://localhost").pathname;
if (await handleHooksRequest(req, res)) {
return;
}
if (
await handleToolsInvokeHttpRequest(req, res, {
auth: resolvedAuth,
trustedProxies,
allowRealIpFallback,
rateLimiter,
})
) {
return;
}
if (await handleSlackHttpRequest(req, res)) {
return;
}
if (handlePluginRequest) {
// Channel HTTP endpoints are gateway-auth protected by default.
// Non-channel plugin routes remain plugin-owned and must enforce
// their own auth when exposing sensitive functionality.
if (requestPath.startsWith("/api/channels/")) {
const token = getBearerToken(req);
const authResult = await authorizeHttpGatewayConnect({
auth: resolvedAuth,
connectAuth: token ? { token, password: token } : null,
req,
trustedProxies,
allowRealIpFallback,
rateLimiter,
});
if (!authResult.ok) {
sendGatewayAuthFailure(res, authResult);
return;
}
}
if (await handlePluginRequest(req, res)) {
return;
}
}
if (openResponsesEnabled) {
if (
await handleOpenResponsesHttpRequest(req, res, {
auth: resolvedAuth,
config: openResponsesConfig,
trustedProxies,
allowRealIpFallback,
rateLimiter,
})
) {
return;
}
}
if (openAiChatCompletionsEnabled) {
if (
await handleOpenAiHttpRequest(req, res, {
auth: resolvedAuth,
trustedProxies,
allowRealIpFallback,
rateLimiter,
})
) {
return;
}
}
if (canvasHost) {
if (isCanvasPath(requestPath)) {
const ok = await authorizeCanvasRequest({
req,
auth: resolvedAuth,
trustedProxies,
allowRealIpFallback,
clients,
canvasCapability: scopedCanvas.capability,
malformedScopedPath: scopedCanvas.malformedScopedPath,
rateLimiter,
});
if (!ok.ok) {
sendGatewayAuthFailure(res, ok);
return;
}
}
if (await handleA2uiHttpRequest(req, res)) {
return;
}
if (await canvasHost.handleHttpRequest(req, res)) {
return;
}
}
if (controlUiEnabled) {
if (
handleControlUiAvatarRequest(req, res, {
basePath: controlUiBasePath,
resolveAvatar: (agentId) => resolveAgentAvatar(configSnapshot, agentId),
})
) {
return;
}
if (
handleControlUiHttpRequest(req, res, {
basePath: controlUiBasePath,
config: configSnapshot,
root: controlUiRoot,
})
) {
return;
}
}
res.statusCode = 404;
res.setHeader("Content-Type", "text/plain; charset=utf-8");
res.end("Not Found");
} catch {
res.statusCode = 500;
res.setHeader("Content-Type", "text/plain; charset=utf-8");
res.end("Internal Server Error");
}
}
return httpServer;
}
export function attachGatewayUpgradeHandler(opts: {
httpServer: HttpServer;
wss: WebSocketServer;
canvasHost: CanvasHostHandler | null;
clients: Set<GatewayWsClient>;
resolvedAuth: ResolvedGatewayAuth;
/** Optional rate limiter for auth brute-force protection. */
rateLimiter?: AuthRateLimiter;
}) {
const { httpServer, wss, canvasHost, clients, resolvedAuth, rateLimiter } = opts;
httpServer.on("upgrade", (req, socket, head) => {
void (async () => {
const scopedCanvas = normalizeCanvasScopedUrl(req.url ?? "/");
if (scopedCanvas.malformedScopedPath) {
writeUpgradeAuthFailure(socket, { ok: false, reason: "unauthorized" });
socket.destroy();
return;
}
if (scopedCanvas.rewrittenUrl) {
req.url = scopedCanvas.rewrittenUrl;
}
if (canvasHost) {
const url = new URL(req.url ?? "/", "http://localhost");
if (url.pathname === CANVAS_WS_PATH) {
const configSnapshot = loadConfig();
const trustedProxies = configSnapshot.gateway?.trustedProxies ?? [];
const allowRealIpFallback = configSnapshot.gateway?.allowRealIpFallback === true;
const ok = await authorizeCanvasRequest({
req,
auth: resolvedAuth,
trustedProxies,
allowRealIpFallback,
clients,
canvasCapability: scopedCanvas.capability,
malformedScopedPath: scopedCanvas.malformedScopedPath,
rateLimiter,
});
if (!ok.ok) {
writeUpgradeAuthFailure(socket, ok);
socket.destroy();
return;
}
}
if (canvasHost.handleUpgrade(req, socket, head)) {
return;
}
}
wss.handleUpgrade(req, socket, head, (ws) => {
wss.emit("connection", ws, req);
});
})().catch(() => {
socket.destroy();
});
});
}