749 lines
25 KiB
TypeScript
749 lines
25 KiB
TypeScript
import path from "node:path";
|
|
import type { CanvasHostServer } from "../canvas-host/server.js";
|
|
import type { PluginRegistry } from "../plugins/registry.js";
|
|
import type { PluginServicesHandle } from "../plugins/services.js";
|
|
import type { RuntimeEnv } from "../runtime.js";
|
|
import type { ControlUiRootState } from "./control-ui.js";
|
|
import type { startBrowserControlServerIfEnabled } from "./server-browser.js";
|
|
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js";
|
|
import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/runs.js";
|
|
import { registerSkillsChangeListener } from "../agents/skills/refresh.js";
|
|
import { initSubagentRegistry } from "../agents/subagent-registry.js";
|
|
import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js";
|
|
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
|
|
import { formatCliCommand } from "../cli/command-format.js";
|
|
import { createDefaultDeps } from "../cli/deps.js";
|
|
import {
|
|
CONFIG_PATH,
|
|
isNixMode,
|
|
loadConfig,
|
|
migrateLegacyConfig,
|
|
readConfigFileSnapshot,
|
|
writeConfigFile,
|
|
} from "../config/config.js";
|
|
import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
|
|
import { clearAgentRunContext, onAgentEvent } from "../infra/agent-events.js";
|
|
import {
|
|
ensureControlUiAssetsBuilt,
|
|
resolveControlUiRootOverrideSync,
|
|
resolveControlUiRootSync,
|
|
} from "../infra/control-ui-assets.js";
|
|
import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js";
|
|
import { logAcceptedEnvOption } from "../infra/env.js";
|
|
import { createExecApprovalForwarder } from "../infra/exec-approval-forwarder.js";
|
|
import { onHeartbeatEvent } from "../infra/heartbeat-events.js";
|
|
import { startHeartbeatRunner, type HeartbeatRunner } from "../infra/heartbeat-runner.js";
|
|
import { getMachineDisplayName } from "../infra/machine-name.js";
|
|
import { ensureOpenClawCliOnPath } from "../infra/path-env.js";
|
|
import { setGatewaySigusr1RestartPolicy, setPreRestartDeferralCheck } from "../infra/restart.js";
|
|
import {
|
|
primeRemoteSkillsCache,
|
|
refreshRemoteBinsForConnectedNodes,
|
|
setSkillsRemoteRegistry,
|
|
} from "../infra/skills-remote.js";
|
|
import { scheduleGatewayUpdateCheck } from "../infra/update-startup.js";
|
|
import { startDiagnosticHeartbeat, stopDiagnosticHeartbeat } from "../logging/diagnostic.js";
|
|
import { createSubsystemLogger, runtimeForLogger } from "../logging/subsystem.js";
|
|
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
|
|
import { getTotalQueueSize } from "../process/command-queue.js";
|
|
import { runOnboardingWizard } from "../wizard/onboarding.js";
|
|
import { createAuthRateLimiter, type AuthRateLimiter } from "./auth-rate-limit.js";
|
|
import { startGatewayConfigReloader } from "./config-reload.js";
|
|
import { ExecApprovalManager } from "./exec-approval-manager.js";
|
|
import { NodeRegistry } from "./node-registry.js";
|
|
import { createChannelManager } from "./server-channels.js";
|
|
import { createAgentEventHandler } from "./server-chat.js";
|
|
import { createGatewayCloseHandler } from "./server-close.js";
|
|
import { buildGatewayCronService } from "./server-cron.js";
|
|
import { startGatewayDiscovery } from "./server-discovery-runtime.js";
|
|
import { applyGatewayLaneConcurrency } from "./server-lanes.js";
|
|
import { startGatewayMaintenanceTimers } from "./server-maintenance.js";
|
|
import { GATEWAY_EVENTS, listGatewayMethods } from "./server-methods-list.js";
|
|
import { coreGatewayHandlers } from "./server-methods.js";
|
|
import { createExecApprovalHandlers } from "./server-methods/exec-approval.js";
|
|
import { safeParseJson } from "./server-methods/nodes.helpers.js";
|
|
import { hasConnectedMobileNode } from "./server-mobile-nodes.js";
|
|
import { loadGatewayModelCatalog } from "./server-model-catalog.js";
|
|
import { createNodeSubscriptionManager } from "./server-node-subscriptions.js";
|
|
import { loadGatewayPlugins } from "./server-plugins.js";
|
|
import { createGatewayReloadHandlers } from "./server-reload-handlers.js";
|
|
import { resolveGatewayRuntimeConfig } from "./server-runtime-config.js";
|
|
import { createGatewayRuntimeState } from "./server-runtime-state.js";
|
|
import { resolveSessionKeyForRun } from "./server-session-key.js";
|
|
import { logGatewayStartup } from "./server-startup-log.js";
|
|
import { startGatewaySidecars } from "./server-startup.js";
|
|
import { startGatewayTailscaleExposure } from "./server-tailscale.js";
|
|
import { createWizardSessionTracker } from "./server-wizard-sessions.js";
|
|
import { attachGatewayWsHandlers } from "./server-ws-runtime.js";
|
|
import {
|
|
getHealthCache,
|
|
getHealthVersion,
|
|
getPresenceVersion,
|
|
incrementPresenceVersion,
|
|
refreshGatewayHealthSnapshot,
|
|
} from "./server/health-state.js";
|
|
import { loadGatewayTlsRuntime } from "./server/tls.js";
|
|
|
|
export { __resetModelCatalogCacheForTest } from "./server-model-catalog.js";
|
|
|
|
ensureOpenClawCliOnPath();
|
|
|
|
const log = createSubsystemLogger("gateway");
|
|
const logCanvas = log.child("canvas");
|
|
const logDiscovery = log.child("discovery");
|
|
const logTailscale = log.child("tailscale");
|
|
const logChannels = log.child("channels");
|
|
const logBrowser = log.child("browser");
|
|
const logHealth = log.child("health");
|
|
const logCron = log.child("cron");
|
|
const logReload = log.child("reload");
|
|
const logHooks = log.child("hooks");
|
|
const logPlugins = log.child("plugins");
|
|
const logWsControl = log.child("ws");
|
|
const gatewayRuntime = runtimeForLogger(log);
|
|
const canvasRuntime = runtimeForLogger(logCanvas);
|
|
|
|
export type GatewayServer = {
|
|
close: (opts?: { reason?: string; restartExpectedMs?: number | null }) => Promise<void>;
|
|
};
|
|
|
|
export type GatewayServerOptions = {
|
|
/**
|
|
* Bind address policy for the Gateway WebSocket/HTTP server.
|
|
* - loopback: 127.0.0.1
|
|
* - lan: 0.0.0.0
|
|
* - tailnet: bind only to the Tailscale IPv4 address (100.64.0.0/10)
|
|
* - auto: prefer loopback, else LAN
|
|
*/
|
|
bind?: import("../config/config.js").GatewayBindMode;
|
|
/**
|
|
* Advanced override for the bind host, bypassing bind resolution.
|
|
* Prefer `bind` unless you really need a specific address.
|
|
*/
|
|
host?: string;
|
|
/**
|
|
* If false, do not serve the browser Control UI.
|
|
* Default: config `gateway.controlUi.enabled` (or true when absent).
|
|
*/
|
|
controlUiEnabled?: boolean;
|
|
/**
|
|
* If false, do not serve `POST /v1/chat/completions`.
|
|
* Default: config `gateway.http.endpoints.chatCompletions.enabled` (or false when absent).
|
|
*/
|
|
openAiChatCompletionsEnabled?: boolean;
|
|
/**
|
|
* If false, do not serve `POST /v1/responses` (OpenResponses API).
|
|
* Default: config `gateway.http.endpoints.responses.enabled` (or false when absent).
|
|
*/
|
|
openResponsesEnabled?: boolean;
|
|
/**
|
|
* Override gateway auth configuration (merges with config).
|
|
*/
|
|
auth?: import("../config/config.js").GatewayAuthConfig;
|
|
/**
|
|
* Override gateway Tailscale exposure configuration (merges with config).
|
|
*/
|
|
tailscale?: import("../config/config.js").GatewayTailscaleConfig;
|
|
/**
|
|
* Test-only: allow canvas host startup even when NODE_ENV/VITEST would disable it.
|
|
*/
|
|
allowCanvasHostInTests?: boolean;
|
|
/**
|
|
* Test-only: override the onboarding wizard runner.
|
|
*/
|
|
wizardRunner?: (
|
|
opts: import("../commands/onboard-types.js").OnboardOptions,
|
|
runtime: import("../runtime.js").RuntimeEnv,
|
|
prompter: import("../wizard/prompts.js").WizardPrompter,
|
|
) => Promise<void>;
|
|
};
|
|
|
|
export async function startGatewayServer(
|
|
port = 18789,
|
|
opts: GatewayServerOptions = {},
|
|
): Promise<GatewayServer> {
|
|
const minimalTestGateway =
|
|
process.env.VITEST === "1" && process.env.OPENCLAW_TEST_MINIMAL_GATEWAY === "1";
|
|
|
|
// Ensure all default port derivations (browser/canvas) see the actual runtime port.
|
|
process.env.OPENCLAW_GATEWAY_PORT = String(port);
|
|
logAcceptedEnvOption({
|
|
key: "OPENCLAW_RAW_STREAM",
|
|
description: "raw stream logging enabled",
|
|
});
|
|
logAcceptedEnvOption({
|
|
key: "OPENCLAW_RAW_STREAM_PATH",
|
|
description: "raw stream log path override",
|
|
});
|
|
|
|
let configSnapshot = await readConfigFileSnapshot();
|
|
if (configSnapshot.legacyIssues.length > 0) {
|
|
if (isNixMode) {
|
|
throw new Error(
|
|
"Legacy config entries detected while running in Nix mode. Update your Nix config to the latest schema and restart.",
|
|
);
|
|
}
|
|
const { config: migrated, changes } = migrateLegacyConfig(configSnapshot.parsed);
|
|
if (!migrated) {
|
|
throw new Error(
|
|
`Legacy config entries detected but auto-migration failed. Run "${formatCliCommand("openclaw doctor")}" to migrate.`,
|
|
);
|
|
}
|
|
await writeConfigFile(migrated);
|
|
if (changes.length > 0) {
|
|
log.info(
|
|
`gateway: migrated legacy config entries:\n${changes
|
|
.map((entry) => `- ${entry}`)
|
|
.join("\n")}`,
|
|
);
|
|
}
|
|
}
|
|
|
|
configSnapshot = await readConfigFileSnapshot();
|
|
if (configSnapshot.exists && !configSnapshot.valid) {
|
|
const issues =
|
|
configSnapshot.issues.length > 0
|
|
? configSnapshot.issues
|
|
.map((issue) => `${issue.path || "<root>"}: ${issue.message}`)
|
|
.join("\n")
|
|
: "Unknown validation issue.";
|
|
throw new Error(
|
|
`Invalid config at ${configSnapshot.path}.\n${issues}\nRun "${formatCliCommand("openclaw doctor")}" to repair, then retry.`,
|
|
);
|
|
}
|
|
|
|
const autoEnable = applyPluginAutoEnable({ config: configSnapshot.config, env: process.env });
|
|
if (autoEnable.changes.length > 0) {
|
|
try {
|
|
await writeConfigFile(autoEnable.config);
|
|
log.info(
|
|
`gateway: auto-enabled plugins:\n${autoEnable.changes
|
|
.map((entry) => `- ${entry}`)
|
|
.join("\n")}`,
|
|
);
|
|
} catch (err) {
|
|
log.warn(`gateway: failed to persist plugin auto-enable changes: ${String(err)}`);
|
|
}
|
|
}
|
|
|
|
const cfgAtStart = loadConfig();
|
|
const diagnosticsEnabled = isDiagnosticsEnabled(cfgAtStart);
|
|
if (diagnosticsEnabled) {
|
|
startDiagnosticHeartbeat();
|
|
}
|
|
setGatewaySigusr1RestartPolicy({ allowExternal: cfgAtStart.commands?.restart === true });
|
|
setPreRestartDeferralCheck(
|
|
() => getTotalQueueSize() + getTotalPendingReplies() + getActiveEmbeddedRunCount(),
|
|
);
|
|
initSubagentRegistry();
|
|
const defaultAgentId = resolveDefaultAgentId(cfgAtStart);
|
|
const defaultWorkspaceDir = resolveAgentWorkspaceDir(cfgAtStart, defaultAgentId);
|
|
const baseMethods = listGatewayMethods();
|
|
const emptyPluginRegistry: PluginRegistry = {
|
|
plugins: [],
|
|
tools: [],
|
|
hooks: [],
|
|
typedHooks: [],
|
|
channels: [],
|
|
providers: [],
|
|
gatewayHandlers: {},
|
|
httpHandlers: [],
|
|
httpRoutes: [],
|
|
cliRegistrars: [],
|
|
services: [],
|
|
commands: [],
|
|
diagnostics: [],
|
|
};
|
|
const { pluginRegistry, gatewayMethods: baseGatewayMethods } = minimalTestGateway
|
|
? { pluginRegistry: emptyPluginRegistry, gatewayMethods: baseMethods }
|
|
: loadGatewayPlugins({
|
|
cfg: cfgAtStart,
|
|
workspaceDir: defaultWorkspaceDir,
|
|
log,
|
|
coreGatewayHandlers,
|
|
baseMethods,
|
|
});
|
|
const channelLogs = Object.fromEntries(
|
|
listChannelPlugins().map((plugin) => [plugin.id, logChannels.child(plugin.id)]),
|
|
) as Record<ChannelId, ReturnType<typeof createSubsystemLogger>>;
|
|
const channelRuntimeEnvs = Object.fromEntries(
|
|
Object.entries(channelLogs).map(([id, logger]) => [id, runtimeForLogger(logger)]),
|
|
) as Record<ChannelId, RuntimeEnv>;
|
|
const channelMethods = listChannelPlugins().flatMap((plugin) => plugin.gatewayMethods ?? []);
|
|
const gatewayMethods = Array.from(new Set([...baseGatewayMethods, ...channelMethods]));
|
|
let pluginServices: PluginServicesHandle | null = null;
|
|
const runtimeConfig = await resolveGatewayRuntimeConfig({
|
|
cfg: cfgAtStart,
|
|
port,
|
|
bind: opts.bind,
|
|
host: opts.host,
|
|
controlUiEnabled: opts.controlUiEnabled,
|
|
openAiChatCompletionsEnabled: opts.openAiChatCompletionsEnabled,
|
|
openResponsesEnabled: opts.openResponsesEnabled,
|
|
auth: opts.auth,
|
|
tailscale: opts.tailscale,
|
|
});
|
|
const {
|
|
bindHost,
|
|
controlUiEnabled,
|
|
openAiChatCompletionsEnabled,
|
|
openResponsesEnabled,
|
|
openResponsesConfig,
|
|
controlUiBasePath,
|
|
controlUiRoot: controlUiRootOverride,
|
|
resolvedAuth,
|
|
tailscaleConfig,
|
|
tailscaleMode,
|
|
} = runtimeConfig;
|
|
let hooksConfig = runtimeConfig.hooksConfig;
|
|
const canvasHostEnabled = runtimeConfig.canvasHostEnabled;
|
|
|
|
// Create auth rate limiter only when explicitly configured.
|
|
const rateLimitConfig = cfgAtStart.gateway?.auth?.rateLimit;
|
|
const authRateLimiter: AuthRateLimiter | undefined = rateLimitConfig
|
|
? createAuthRateLimiter(rateLimitConfig)
|
|
: undefined;
|
|
|
|
let controlUiRootState: ControlUiRootState | undefined;
|
|
if (controlUiRootOverride) {
|
|
const resolvedOverride = resolveControlUiRootOverrideSync(controlUiRootOverride);
|
|
const resolvedOverridePath = path.resolve(controlUiRootOverride);
|
|
controlUiRootState = resolvedOverride
|
|
? { kind: "resolved", path: resolvedOverride }
|
|
: { kind: "invalid", path: resolvedOverridePath };
|
|
if (!resolvedOverride) {
|
|
log.warn(`gateway: controlUi.root not found at ${resolvedOverridePath}`);
|
|
}
|
|
} else if (controlUiEnabled) {
|
|
let resolvedRoot = resolveControlUiRootSync({
|
|
moduleUrl: import.meta.url,
|
|
argv1: process.argv[1],
|
|
cwd: process.cwd(),
|
|
});
|
|
if (!resolvedRoot) {
|
|
const ensureResult = await ensureControlUiAssetsBuilt(gatewayRuntime);
|
|
if (!ensureResult.ok && ensureResult.message) {
|
|
log.warn(`gateway: ${ensureResult.message}`);
|
|
}
|
|
resolvedRoot = resolveControlUiRootSync({
|
|
moduleUrl: import.meta.url,
|
|
argv1: process.argv[1],
|
|
cwd: process.cwd(),
|
|
});
|
|
}
|
|
controlUiRootState = resolvedRoot
|
|
? { kind: "resolved", path: resolvedRoot }
|
|
: { kind: "missing" };
|
|
}
|
|
|
|
const wizardRunner = opts.wizardRunner ?? runOnboardingWizard;
|
|
const { wizardSessions, findRunningWizard, purgeWizardSession } = createWizardSessionTracker();
|
|
|
|
const deps = createDefaultDeps();
|
|
let canvasHostServer: CanvasHostServer | null = null;
|
|
const gatewayTls = await loadGatewayTlsRuntime(cfgAtStart.gateway?.tls, log.child("tls"));
|
|
if (cfgAtStart.gateway?.tls?.enabled && !gatewayTls.enabled) {
|
|
throw new Error(gatewayTls.error ?? "gateway tls: failed to enable");
|
|
}
|
|
const {
|
|
canvasHost,
|
|
httpServer,
|
|
httpServers,
|
|
httpBindHosts,
|
|
wss,
|
|
clients,
|
|
broadcast,
|
|
broadcastToConnIds,
|
|
agentRunSeq,
|
|
dedupe,
|
|
chatRunState,
|
|
chatRunBuffers,
|
|
chatDeltaSentAt,
|
|
addChatRun,
|
|
removeChatRun,
|
|
chatAbortControllers,
|
|
toolEventRecipients,
|
|
} = await createGatewayRuntimeState({
|
|
cfg: cfgAtStart,
|
|
bindHost,
|
|
port,
|
|
controlUiEnabled,
|
|
controlUiBasePath,
|
|
controlUiRoot: controlUiRootState,
|
|
openAiChatCompletionsEnabled,
|
|
openResponsesEnabled,
|
|
openResponsesConfig,
|
|
resolvedAuth,
|
|
rateLimiter: authRateLimiter,
|
|
gatewayTls,
|
|
hooksConfig: () => hooksConfig,
|
|
pluginRegistry,
|
|
deps,
|
|
canvasRuntime,
|
|
canvasHostEnabled,
|
|
allowCanvasHostInTests: opts.allowCanvasHostInTests,
|
|
logCanvas,
|
|
log,
|
|
logHooks,
|
|
logPlugins,
|
|
});
|
|
let bonjourStop: (() => Promise<void>) | null = null;
|
|
const nodeRegistry = new NodeRegistry();
|
|
const nodePresenceTimers = new Map<string, ReturnType<typeof setInterval>>();
|
|
const nodeSubscriptions = createNodeSubscriptionManager();
|
|
const nodeSendEvent = (opts: { nodeId: string; event: string; payloadJSON?: string | null }) => {
|
|
const payload = safeParseJson(opts.payloadJSON ?? null);
|
|
nodeRegistry.sendEvent(opts.nodeId, opts.event, payload);
|
|
};
|
|
const nodeSendToSession = (sessionKey: string, event: string, payload: unknown) =>
|
|
nodeSubscriptions.sendToSession(sessionKey, event, payload, nodeSendEvent);
|
|
const nodeSendToAllSubscribed = (event: string, payload: unknown) =>
|
|
nodeSubscriptions.sendToAllSubscribed(event, payload, nodeSendEvent);
|
|
const nodeSubscribe = nodeSubscriptions.subscribe;
|
|
const nodeUnsubscribe = nodeSubscriptions.unsubscribe;
|
|
const nodeUnsubscribeAll = nodeSubscriptions.unsubscribeAll;
|
|
const broadcastVoiceWakeChanged = (triggers: string[]) => {
|
|
broadcast("voicewake.changed", { triggers }, { dropIfSlow: true });
|
|
};
|
|
const hasMobileNodeConnected = () => hasConnectedMobileNode(nodeRegistry);
|
|
applyGatewayLaneConcurrency(cfgAtStart);
|
|
|
|
let cronState = buildGatewayCronService({
|
|
cfg: cfgAtStart,
|
|
deps,
|
|
broadcast,
|
|
});
|
|
let { cron, storePath: cronStorePath } = cronState;
|
|
|
|
const channelManager = createChannelManager({
|
|
loadConfig,
|
|
channelLogs,
|
|
channelRuntimeEnvs,
|
|
});
|
|
const { getRuntimeSnapshot, startChannels, startChannel, stopChannel, markChannelLoggedOut } =
|
|
channelManager;
|
|
|
|
if (!minimalTestGateway) {
|
|
const machineDisplayName = await getMachineDisplayName();
|
|
const discovery = await startGatewayDiscovery({
|
|
machineDisplayName,
|
|
port,
|
|
gatewayTls: gatewayTls.enabled
|
|
? { enabled: true, fingerprintSha256: gatewayTls.fingerprintSha256 }
|
|
: undefined,
|
|
wideAreaDiscoveryEnabled: cfgAtStart.discovery?.wideArea?.enabled === true,
|
|
wideAreaDiscoveryDomain: cfgAtStart.discovery?.wideArea?.domain,
|
|
tailscaleMode,
|
|
mdnsMode: cfgAtStart.discovery?.mdns?.mode,
|
|
logDiscovery,
|
|
});
|
|
bonjourStop = discovery.bonjourStop;
|
|
}
|
|
|
|
if (!minimalTestGateway) {
|
|
setSkillsRemoteRegistry(nodeRegistry);
|
|
void primeRemoteSkillsCache();
|
|
}
|
|
// Debounce skills-triggered node probes to avoid feedback loops and rapid-fire invokes.
|
|
// Skills changes can happen in bursts (e.g., file watcher events), and each probe
|
|
// takes time to complete. A 30-second delay ensures we batch changes together.
|
|
let skillsRefreshTimer: ReturnType<typeof setTimeout> | null = null;
|
|
const skillsRefreshDelayMs = 30_000;
|
|
const skillsChangeUnsub = minimalTestGateway
|
|
? () => {}
|
|
: registerSkillsChangeListener((event) => {
|
|
if (event.reason === "remote-node") {
|
|
return;
|
|
}
|
|
if (skillsRefreshTimer) {
|
|
clearTimeout(skillsRefreshTimer);
|
|
}
|
|
skillsRefreshTimer = setTimeout(() => {
|
|
skillsRefreshTimer = null;
|
|
const latest = loadConfig();
|
|
void refreshRemoteBinsForConnectedNodes(latest);
|
|
}, skillsRefreshDelayMs);
|
|
});
|
|
|
|
const noopInterval = () => setInterval(() => {}, 1 << 30);
|
|
let tickInterval = noopInterval();
|
|
let healthInterval = noopInterval();
|
|
let dedupeCleanup = noopInterval();
|
|
if (!minimalTestGateway) {
|
|
({ tickInterval, healthInterval, dedupeCleanup } = startGatewayMaintenanceTimers({
|
|
broadcast,
|
|
nodeSendToAllSubscribed,
|
|
getPresenceVersion,
|
|
getHealthVersion,
|
|
refreshGatewayHealthSnapshot,
|
|
logHealth,
|
|
dedupe,
|
|
chatAbortControllers,
|
|
chatRunState,
|
|
chatRunBuffers,
|
|
chatDeltaSentAt,
|
|
removeChatRun,
|
|
agentRunSeq,
|
|
nodeSendToSession,
|
|
}));
|
|
}
|
|
|
|
const agentUnsub = minimalTestGateway
|
|
? null
|
|
: onAgentEvent(
|
|
createAgentEventHandler({
|
|
broadcast,
|
|
broadcastToConnIds,
|
|
nodeSendToSession,
|
|
agentRunSeq,
|
|
chatRunState,
|
|
resolveSessionKeyForRun,
|
|
clearAgentRunContext,
|
|
toolEventRecipients,
|
|
}),
|
|
);
|
|
|
|
const heartbeatUnsub = minimalTestGateway
|
|
? null
|
|
: onHeartbeatEvent((evt) => {
|
|
broadcast("heartbeat", evt, { dropIfSlow: true });
|
|
});
|
|
|
|
let heartbeatRunner: HeartbeatRunner = minimalTestGateway
|
|
? {
|
|
stop: () => {},
|
|
updateConfig: () => {},
|
|
}
|
|
: startHeartbeatRunner({ cfg: cfgAtStart });
|
|
|
|
if (!minimalTestGateway) {
|
|
void cron.start().catch((err) => logCron.error(`failed to start: ${String(err)}`));
|
|
}
|
|
|
|
// Recover pending outbound deliveries from previous crash/restart.
|
|
if (!minimalTestGateway) {
|
|
void (async () => {
|
|
const { recoverPendingDeliveries } = await import("../infra/outbound/delivery-queue.js");
|
|
const { deliverOutboundPayloads } = await import("../infra/outbound/deliver.js");
|
|
const logRecovery = log.child("delivery-recovery");
|
|
await recoverPendingDeliveries({
|
|
deliver: deliverOutboundPayloads,
|
|
log: logRecovery,
|
|
cfg: cfgAtStart,
|
|
});
|
|
})().catch((err) => log.error(`Delivery recovery failed: ${String(err)}`));
|
|
}
|
|
|
|
const execApprovalManager = new ExecApprovalManager();
|
|
const execApprovalForwarder = createExecApprovalForwarder();
|
|
const execApprovalHandlers = createExecApprovalHandlers(execApprovalManager, {
|
|
forwarder: execApprovalForwarder,
|
|
});
|
|
|
|
const canvasHostServerPort = (canvasHostServer as CanvasHostServer | null)?.port;
|
|
|
|
attachGatewayWsHandlers({
|
|
wss,
|
|
clients,
|
|
port,
|
|
gatewayHost: bindHost ?? undefined,
|
|
canvasHostEnabled: Boolean(canvasHost),
|
|
canvasHostServerPort,
|
|
resolvedAuth,
|
|
rateLimiter: authRateLimiter,
|
|
gatewayMethods,
|
|
events: GATEWAY_EVENTS,
|
|
logGateway: log,
|
|
logHealth,
|
|
logWsControl,
|
|
extraHandlers: {
|
|
...pluginRegistry.gatewayHandlers,
|
|
...execApprovalHandlers,
|
|
},
|
|
broadcast,
|
|
context: {
|
|
deps,
|
|
cron,
|
|
cronStorePath,
|
|
execApprovalManager,
|
|
loadGatewayModelCatalog,
|
|
getHealthCache,
|
|
refreshHealthSnapshot: refreshGatewayHealthSnapshot,
|
|
logHealth,
|
|
logGateway: log,
|
|
incrementPresenceVersion,
|
|
getHealthVersion,
|
|
broadcast,
|
|
broadcastToConnIds,
|
|
nodeSendToSession,
|
|
nodeSendToAllSubscribed,
|
|
nodeSubscribe,
|
|
nodeUnsubscribe,
|
|
nodeUnsubscribeAll,
|
|
hasConnectedMobileNode: hasMobileNodeConnected,
|
|
nodeRegistry,
|
|
agentRunSeq,
|
|
chatAbortControllers,
|
|
chatAbortedRuns: chatRunState.abortedRuns,
|
|
chatRunBuffers: chatRunState.buffers,
|
|
chatDeltaSentAt: chatRunState.deltaSentAt,
|
|
addChatRun,
|
|
removeChatRun,
|
|
registerToolEventRecipient: toolEventRecipients.add,
|
|
dedupe,
|
|
wizardSessions,
|
|
findRunningWizard,
|
|
purgeWizardSession,
|
|
getRuntimeSnapshot,
|
|
startChannel,
|
|
stopChannel,
|
|
markChannelLoggedOut,
|
|
wizardRunner,
|
|
broadcastVoiceWakeChanged,
|
|
},
|
|
});
|
|
logGatewayStartup({
|
|
cfg: cfgAtStart,
|
|
bindHost,
|
|
bindHosts: httpBindHosts,
|
|
port,
|
|
tlsEnabled: gatewayTls.enabled,
|
|
log,
|
|
isNixMode,
|
|
});
|
|
if (!minimalTestGateway) {
|
|
scheduleGatewayUpdateCheck({ cfg: cfgAtStart, log, isNixMode });
|
|
}
|
|
const tailscaleCleanup = minimalTestGateway
|
|
? null
|
|
: await startGatewayTailscaleExposure({
|
|
tailscaleMode,
|
|
resetOnExit: tailscaleConfig.resetOnExit,
|
|
port,
|
|
controlUiBasePath,
|
|
logTailscale,
|
|
});
|
|
|
|
let browserControl: Awaited<ReturnType<typeof startBrowserControlServerIfEnabled>> = null;
|
|
if (!minimalTestGateway) {
|
|
({ browserControl, pluginServices } = await startGatewaySidecars({
|
|
cfg: cfgAtStart,
|
|
pluginRegistry,
|
|
defaultWorkspaceDir,
|
|
deps,
|
|
startChannels,
|
|
log,
|
|
logHooks,
|
|
logChannels,
|
|
logBrowser,
|
|
}));
|
|
}
|
|
|
|
// Run gateway_start plugin hook (fire-and-forget)
|
|
if (!minimalTestGateway) {
|
|
const hookRunner = getGlobalHookRunner();
|
|
if (hookRunner?.hasHooks("gateway_start")) {
|
|
void hookRunner.runGatewayStart({ port }, { port }).catch((err) => {
|
|
log.warn(`gateway_start hook failed: ${String(err)}`);
|
|
});
|
|
}
|
|
}
|
|
|
|
const configReloader = minimalTestGateway
|
|
? { stop: async () => {} }
|
|
: (() => {
|
|
const { applyHotReload, requestGatewayRestart } = createGatewayReloadHandlers({
|
|
deps,
|
|
broadcast,
|
|
getState: () => ({
|
|
hooksConfig,
|
|
heartbeatRunner,
|
|
cronState,
|
|
browserControl,
|
|
}),
|
|
setState: (nextState) => {
|
|
hooksConfig = nextState.hooksConfig;
|
|
heartbeatRunner = nextState.heartbeatRunner;
|
|
cronState = nextState.cronState;
|
|
cron = cronState.cron;
|
|
cronStorePath = cronState.storePath;
|
|
browserControl = nextState.browserControl;
|
|
},
|
|
startChannel,
|
|
stopChannel,
|
|
logHooks,
|
|
logBrowser,
|
|
logChannels,
|
|
logCron,
|
|
logReload,
|
|
});
|
|
|
|
return startGatewayConfigReloader({
|
|
initialConfig: cfgAtStart,
|
|
readSnapshot: readConfigFileSnapshot,
|
|
onHotReload: applyHotReload,
|
|
onRestart: requestGatewayRestart,
|
|
log: {
|
|
info: (msg) => logReload.info(msg),
|
|
warn: (msg) => logReload.warn(msg),
|
|
error: (msg) => logReload.error(msg),
|
|
},
|
|
watchPath: CONFIG_PATH,
|
|
});
|
|
})();
|
|
|
|
const close = createGatewayCloseHandler({
|
|
bonjourStop,
|
|
tailscaleCleanup,
|
|
canvasHost,
|
|
canvasHostServer,
|
|
stopChannel,
|
|
pluginServices,
|
|
cron,
|
|
heartbeatRunner,
|
|
nodePresenceTimers,
|
|
broadcast,
|
|
tickInterval,
|
|
healthInterval,
|
|
dedupeCleanup,
|
|
agentUnsub,
|
|
heartbeatUnsub,
|
|
chatRunState,
|
|
clients,
|
|
configReloader,
|
|
browserControl,
|
|
wss,
|
|
httpServer,
|
|
httpServers,
|
|
});
|
|
|
|
return {
|
|
close: async (opts) => {
|
|
// Run gateway_stop plugin hook before shutdown
|
|
{
|
|
const hookRunner = getGlobalHookRunner();
|
|
if (hookRunner?.hasHooks("gateway_stop")) {
|
|
try {
|
|
await hookRunner.runGatewayStop(
|
|
{ reason: opts?.reason ?? "gateway stopping" },
|
|
{ port },
|
|
);
|
|
} catch (err) {
|
|
log.warn(`gateway_stop hook failed: ${String(err)}`);
|
|
}
|
|
}
|
|
}
|
|
if (diagnosticsEnabled) {
|
|
stopDiagnosticHeartbeat();
|
|
}
|
|
if (skillsRefreshTimer) {
|
|
clearTimeout(skillsRefreshTimer);
|
|
skillsRefreshTimer = null;
|
|
}
|
|
skillsChangeUnsub();
|
|
authRateLimiter?.dispose();
|
|
await close(opts);
|
|
},
|
|
};
|
|
}
|