import type { startGatewayServer } from "../../gateway/server.js"; import { acquireGatewayLock } from "../../infra/gateway-lock.js"; import { restartGatewayProcessWithFreshPid } from "../../infra/process-respawn.js"; import { consumeGatewaySigusr1RestartAuthorization, isGatewaySigusr1RestartExternallyAllowed, markGatewaySigusr1RestartHandled, } from "../../infra/restart.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; import { getActiveTaskCount, resetAllLanes, waitForActiveTasks, } from "../../process/command-queue.js"; import { createRestartIterationHook } from "../../process/restart-recovery.js"; import type { defaultRuntime } from "../../runtime.js"; const gatewayLog = createSubsystemLogger("gateway"); type GatewayRunSignalAction = "stop" | "restart"; export async function runGatewayLoop(params: { start: () => Promise>>; runtime: typeof defaultRuntime; }) { const lock = await acquireGatewayLock(); let server: Awaited> | null = null; let shuttingDown = false; let restartResolver: (() => void) | null = null; const cleanupSignals = () => { process.removeListener("SIGTERM", onSigterm); process.removeListener("SIGINT", onSigint); process.removeListener("SIGUSR1", onSigusr1); }; const DRAIN_TIMEOUT_MS = 30_000; const SHUTDOWN_TIMEOUT_MS = 5_000; const request = (action: GatewayRunSignalAction, signal: string) => { if (shuttingDown) { gatewayLog.info(`received ${signal} during shutdown; ignoring`); return; } shuttingDown = true; const isRestart = action === "restart"; gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`); // Allow extra time for draining active turns on restart. const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS; const forceExitTimer = setTimeout(() => { gatewayLog.error("shutdown timed out; exiting without full cleanup"); cleanupSignals(); params.runtime.exit(0); }, forceExitMs); void (async () => { try { // On restart, wait for in-flight agent turns to finish before // tearing down the server so buffered messages are delivered. if (isRestart) { const activeTasks = getActiveTaskCount(); if (activeTasks > 0) { gatewayLog.info( `draining ${activeTasks} active task(s) before restart (timeout ${DRAIN_TIMEOUT_MS}ms)`, ); const { drained } = await waitForActiveTasks(DRAIN_TIMEOUT_MS); if (drained) { gatewayLog.info("all active tasks drained"); } else { gatewayLog.warn("drain timeout reached; proceeding with restart"); } } } await server?.close({ reason: isRestart ? "gateway restarting" : "gateway stopping", restartExpectedMs: isRestart ? 1500 : null, }); } catch (err) { gatewayLog.error(`shutdown error: ${String(err)}`); } finally { clearTimeout(forceExitTimer); server = null; if (isRestart) { const respawn = restartGatewayProcessWithFreshPid(); if (respawn.mode === "spawned" || respawn.mode === "supervised") { const modeLabel = respawn.mode === "spawned" ? `spawned pid ${respawn.pid ?? "unknown"}` : "supervisor restart"; gatewayLog.info(`restart mode: full process restart (${modeLabel})`); await lock?.release(); cleanupSignals(); params.runtime.exit(0); } else { if (respawn.mode === "failed") { gatewayLog.warn( `full process restart failed (${respawn.detail ?? "unknown error"}); falling back to in-process restart`, ); } else { gatewayLog.info("restart mode: in-process restart (OPENCLAW_NO_RESPAWN)"); } shuttingDown = false; restartResolver?.(); } } else { await lock?.release(); cleanupSignals(); params.runtime.exit(0); } } })(); }; const onSigterm = () => { gatewayLog.info("signal SIGTERM received"); request("stop", "SIGTERM"); }; const onSigint = () => { gatewayLog.info("signal SIGINT received"); request("stop", "SIGINT"); }; const onSigusr1 = () => { gatewayLog.info("signal SIGUSR1 received"); const authorized = consumeGatewaySigusr1RestartAuthorization(); if (!authorized && !isGatewaySigusr1RestartExternallyAllowed()) { gatewayLog.warn( "SIGUSR1 restart ignored (not authorized; commands.restart=false or use gateway tool).", ); return; } markGatewaySigusr1RestartHandled(); request("restart", "SIGUSR1"); }; process.on("SIGTERM", onSigterm); process.on("SIGINT", onSigint); process.on("SIGUSR1", onSigusr1); try { const onIteration = createRestartIterationHook(() => { // After an in-process restart (SIGUSR1), reset command-queue lane state. // Interrupted tasks from the previous lifecycle may have left `active` // counts elevated (their finally blocks never ran), permanently blocking // new work from draining. This must happen here — at the restart // coordinator level — rather than inside individual subsystem init // functions, to avoid surprising cross-cutting side effects. resetAllLanes(); }); // Keep process alive; SIGUSR1 triggers an in-process restart (no supervisor required). // SIGTERM/SIGINT still exit after a graceful shutdown. // eslint-disable-next-line no-constant-condition while (true) { onIteration(); server = await params.start(); await new Promise((resolve) => { restartResolver = resolve; }); } } finally { await lock?.release(); cleanupSignals(); } }