diff --git a/src/cli/daemon-cli/lifecycle-core.ts b/src/cli/daemon-cli/lifecycle-core.ts index 5e935bb8d..94707a43e 100644 --- a/src/cli/daemon-cli/lifecycle-core.ts +++ b/src/cli/daemon-cli/lifecycle-core.ts @@ -1,3 +1,4 @@ +import type { Writable } from "node:stream"; import { loadConfig } from "../../config/config.js"; import { resolveIsNixMode } from "../../config/paths.js"; import { checkTokenDrift } from "../../daemon/service-audit.js"; @@ -18,6 +19,13 @@ type DaemonLifecycleOptions = { json?: boolean; }; +type RestartPostCheckContext = { + json: boolean; + stdout: Writable; + warnings: string[]; + fail: (message: string, hints?: string[]) => void; +}; + async function maybeAugmentSystemdHints(hints: string[]): Promise { if (process.platform !== "linux") { return hints; @@ -240,6 +248,7 @@ export async function runServiceRestart(params: { renderStartHints: () => string[]; opts?: DaemonLifecycleOptions; checkTokenDrift?: boolean; + postRestartCheck?: (ctx: RestartPostCheckContext) => Promise; }): Promise { const json = Boolean(params.opts?.json); const { stdout, emit, fail } = createActionIO({ action: "restart", json }); @@ -295,6 +304,9 @@ export async function runServiceRestart(params: { try { await params.service.restart({ env: process.env, stdout }); + if (params.postRestartCheck) { + await params.postRestartCheck({ json, stdout, warnings, fail }); + } let restarted = true; try { restarted = await params.service.isLoaded({ env: process.env }); diff --git a/src/cli/daemon-cli/lifecycle.test.ts b/src/cli/daemon-cli/lifecycle.test.ts new file mode 100644 index 000000000..ef0cf5aaa --- /dev/null +++ b/src/cli/daemon-cli/lifecycle.test.ts @@ -0,0 +1,131 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +type RestartHealthSnapshot = { + healthy: boolean; + staleGatewayPids: number[]; + runtime: { status?: string }; + portUsage: { port: number; status: string; listeners: []; hints: []; errors?: string[] }; +}; + +type RestartPostCheckContext = { + json: boolean; + stdout: NodeJS.WritableStream; + warnings: string[]; + fail: (message: string, hints?: string[]) => void; +}; + +type RestartParams = { + opts?: { json?: boolean }; + postRestartCheck?: (ctx: RestartPostCheckContext) => Promise; +}; + +const service = { + readCommand: vi.fn(), + restart: vi.fn(), +}; + +const runServiceRestart = vi.fn(); +const waitForGatewayHealthyRestart = vi.fn(); +const terminateStaleGatewayPids = vi.fn(); +const renderRestartDiagnostics = vi.fn(() => ["diag: unhealthy runtime"]); +const resolveGatewayPort = vi.fn(() => 18789); +const loadConfig = vi.fn(() => ({})); + +vi.mock("../../config/config.js", () => ({ + loadConfig: () => loadConfig(), + resolveGatewayPort, +})); + +vi.mock("../../daemon/service.js", () => ({ + resolveGatewayService: () => service, +})); + +vi.mock("./restart-health.js", () => ({ + waitForGatewayHealthyRestart, + terminateStaleGatewayPids, + renderRestartDiagnostics, +})); + +vi.mock("./lifecycle-core.js", () => ({ + runServiceRestart, + runServiceStart: vi.fn(), + runServiceStop: vi.fn(), + runServiceUninstall: vi.fn(), +})); + +describe("runDaemonRestart health checks", () => { + beforeEach(() => { + vi.resetModules(); + service.readCommand.mockReset(); + service.restart.mockReset(); + runServiceRestart.mockReset(); + waitForGatewayHealthyRestart.mockReset(); + terminateStaleGatewayPids.mockReset(); + renderRestartDiagnostics.mockClear(); + resolveGatewayPort.mockClear(); + loadConfig.mockClear(); + + service.readCommand.mockResolvedValue({ + programArguments: ["openclaw", "gateway", "--port", "18789"], + environment: {}, + }); + + runServiceRestart.mockImplementation(async (params: RestartParams) => { + const fail = (message: string, hints?: string[]) => { + const err = new Error(message) as Error & { hints?: string[] }; + err.hints = hints; + throw err; + }; + await params.postRestartCheck?.({ + json: Boolean(params.opts?.json), + stdout: process.stdout, + warnings: [], + fail, + }); + return true; + }); + }); + + it("kills stale gateway pids and retries restart", async () => { + const unhealthy: RestartHealthSnapshot = { + healthy: false, + staleGatewayPids: [1993], + runtime: { status: "stopped" }, + portUsage: { port: 18789, status: "busy", listeners: [], hints: [] }, + }; + const healthy: RestartHealthSnapshot = { + healthy: true, + staleGatewayPids: [], + runtime: { status: "running" }, + portUsage: { port: 18789, status: "busy", listeners: [], hints: [] }, + }; + waitForGatewayHealthyRestart.mockResolvedValueOnce(unhealthy).mockResolvedValueOnce(healthy); + terminateStaleGatewayPids.mockResolvedValue([1993]); + + const { runDaemonRestart } = await import("./lifecycle.js"); + const result = await runDaemonRestart({ json: true }); + + expect(result).toBe(true); + expect(terminateStaleGatewayPids).toHaveBeenCalledWith([1993]); + expect(service.restart).toHaveBeenCalledTimes(1); + expect(waitForGatewayHealthyRestart).toHaveBeenCalledTimes(2); + }); + + it("fails restart when gateway remains unhealthy", async () => { + const unhealthy: RestartHealthSnapshot = { + healthy: false, + staleGatewayPids: [], + runtime: { status: "stopped" }, + portUsage: { port: 18789, status: "free", listeners: [], hints: [] }, + }; + waitForGatewayHealthyRestart.mockResolvedValue(unhealthy); + + const { runDaemonRestart } = await import("./lifecycle.js"); + + await expect(runDaemonRestart({ json: true })).rejects.toMatchObject({ + message: "Gateway restart failed health checks.", + }); + expect(terminateStaleGatewayPids).not.toHaveBeenCalled(); + expect(renderRestartDiagnostics).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/cli/daemon-cli/lifecycle.ts b/src/cli/daemon-cli/lifecycle.ts index 1a0a8f387..e7749e9b2 100644 --- a/src/cli/daemon-cli/lifecycle.ts +++ b/src/cli/daemon-cli/lifecycle.ts @@ -1,13 +1,38 @@ +import { loadConfig, resolveGatewayPort } from "../../config/config.js"; import { resolveGatewayService } from "../../daemon/service.js"; +import { defaultRuntime } from "../../runtime.js"; +import { theme } from "../../terminal/theme.js"; +import { formatCliCommand } from "../command-format.js"; import { runServiceRestart, runServiceStart, runServiceStop, runServiceUninstall, } from "./lifecycle-core.js"; -import { renderGatewayServiceStartHints } from "./shared.js"; +import { + renderRestartDiagnostics, + terminateStaleGatewayPids, + waitForGatewayHealthyRestart, +} from "./restart-health.js"; +import { parsePortFromArgs, renderGatewayServiceStartHints } from "./shared.js"; import type { DaemonLifecycleOptions } from "./types.js"; +const POST_RESTART_HEALTH_ATTEMPTS = 8; +const POST_RESTART_HEALTH_DELAY_MS = 450; + +async function resolveGatewayRestartPort() { + const service = resolveGatewayService(); + const command = await service.readCommand(process.env).catch(() => null); + const serviceEnv = command?.environment ?? undefined; + const mergedEnv = { + ...(process.env as Record), + ...(serviceEnv ?? undefined), + } as NodeJS.ProcessEnv; + + const portFromArgs = parsePortFromArgs(command?.programArguments); + return portFromArgs ?? resolveGatewayPort(loadConfig(), mergedEnv); +} + export async function runDaemonUninstall(opts: DaemonLifecycleOptions = {}) { return await runServiceUninstall({ serviceNoun: "Gateway", @@ -41,11 +66,62 @@ export async function runDaemonStop(opts: DaemonLifecycleOptions = {}) { * Throws/exits on check or restart failures. */ export async function runDaemonRestart(opts: DaemonLifecycleOptions = {}): Promise { + const json = Boolean(opts.json); + const service = resolveGatewayService(); + const restartPort = await resolveGatewayRestartPort().catch(() => + resolveGatewayPort(loadConfig(), process.env), + ); + return await runServiceRestart({ serviceNoun: "Gateway", - service: resolveGatewayService(), + service, renderStartHints: renderGatewayServiceStartHints, opts, checkTokenDrift: true, + postRestartCheck: async ({ warnings, fail, stdout }) => { + let health = await waitForGatewayHealthyRestart({ + service, + port: restartPort, + attempts: POST_RESTART_HEALTH_ATTEMPTS, + delayMs: POST_RESTART_HEALTH_DELAY_MS, + }); + + if (!health.healthy && health.staleGatewayPids.length > 0) { + const staleMsg = `Found stale gateway process(es): ${health.staleGatewayPids.join(", ")}.`; + warnings.push(staleMsg); + if (!json) { + defaultRuntime.log(theme.warn(staleMsg)); + defaultRuntime.log(theme.muted("Stopping stale process(es) and retrying restart...")); + } + + await terminateStaleGatewayPids(health.staleGatewayPids); + await service.restart({ env: process.env, stdout }); + health = await waitForGatewayHealthyRestart({ + service, + port: restartPort, + attempts: POST_RESTART_HEALTH_ATTEMPTS, + delayMs: POST_RESTART_HEALTH_DELAY_MS, + }); + } + + if (health.healthy) { + return; + } + + const diagnostics = renderRestartDiagnostics(health); + if (!json) { + defaultRuntime.log(theme.warn("Gateway did not become healthy after restart.")); + for (const line of diagnostics) { + defaultRuntime.log(theme.muted(line)); + } + } else { + warnings.push(...diagnostics); + } + + fail("Gateway restart failed health checks.", [ + formatCliCommand("openclaw gateway status --probe --deep"), + formatCliCommand("openclaw doctor"), + ]); + }, }); } diff --git a/src/cli/daemon-cli/restart-health.ts b/src/cli/daemon-cli/restart-health.ts new file mode 100644 index 000000000..b87e58646 --- /dev/null +++ b/src/cli/daemon-cli/restart-health.ts @@ -0,0 +1,172 @@ +import type { GatewayServiceRuntime } from "../../daemon/service-runtime.js"; +import type { GatewayService } from "../../daemon/service.js"; +import { + classifyPortListener, + formatPortDiagnostics, + inspectPortUsage, + type PortUsage, +} from "../../infra/ports.js"; +import { sleep } from "../../utils.js"; + +export const DEFAULT_RESTART_HEALTH_ATTEMPTS = 8; +export const DEFAULT_RESTART_HEALTH_DELAY_MS = 450; + +export type GatewayRestartSnapshot = { + runtime: GatewayServiceRuntime; + portUsage: PortUsage; + healthy: boolean; + staleGatewayPids: number[]; +}; + +export async function inspectGatewayRestart(params: { + service: GatewayService; + port: number; + env?: NodeJS.ProcessEnv; +}): Promise { + const env = params.env ?? process.env; + let runtime: GatewayServiceRuntime = { status: "unknown" }; + try { + runtime = await params.service.readRuntime(env); + } catch (err) { + runtime = { status: "unknown", detail: String(err) }; + } + + let portUsage: PortUsage; + try { + portUsage = await inspectPortUsage(params.port); + } catch (err) { + portUsage = { + port: params.port, + status: "unknown", + listeners: [], + hints: [], + errors: [String(err)], + }; + } + + const gatewayListeners = + portUsage.status === "busy" + ? portUsage.listeners.filter( + (listener) => classifyPortListener(listener, params.port) === "gateway", + ) + : []; + const running = runtime.status === "running"; + const ownsPort = + runtime.pid != null + ? portUsage.listeners.some((listener) => listener.pid === runtime.pid) + : gatewayListeners.length > 0 || + (portUsage.status === "busy" && portUsage.listeners.length === 0); + const healthy = running && ownsPort; + const staleGatewayPids = Array.from( + new Set( + gatewayListeners + .map((listener) => listener.pid) + .filter((pid): pid is number => Number.isFinite(pid)) + .filter((pid) => runtime.pid == null || pid !== runtime.pid || !running), + ), + ); + + return { + runtime, + portUsage, + healthy, + staleGatewayPids, + }; +} + +export async function waitForGatewayHealthyRestart(params: { + service: GatewayService; + port: number; + attempts?: number; + delayMs?: number; + env?: NodeJS.ProcessEnv; +}): Promise { + const attempts = params.attempts ?? DEFAULT_RESTART_HEALTH_ATTEMPTS; + const delayMs = params.delayMs ?? DEFAULT_RESTART_HEALTH_DELAY_MS; + + let snapshot = await inspectGatewayRestart({ + service: params.service, + port: params.port, + env: params.env, + }); + + for (let attempt = 0; attempt < attempts; attempt += 1) { + if (snapshot.healthy) { + return snapshot; + } + if (snapshot.staleGatewayPids.length > 0 && snapshot.runtime.status !== "running") { + return snapshot; + } + await sleep(delayMs); + snapshot = await inspectGatewayRestart({ + service: params.service, + port: params.port, + env: params.env, + }); + } + + return snapshot; +} + +export function renderRestartDiagnostics(snapshot: GatewayRestartSnapshot): string[] { + const lines: string[] = []; + const runtimeSummary = [ + snapshot.runtime.status ? `status=${snapshot.runtime.status}` : null, + snapshot.runtime.state ? `state=${snapshot.runtime.state}` : null, + snapshot.runtime.pid != null ? `pid=${snapshot.runtime.pid}` : null, + snapshot.runtime.lastExitStatus != null ? `lastExit=${snapshot.runtime.lastExitStatus}` : null, + ] + .filter(Boolean) + .join(", "); + + if (runtimeSummary) { + lines.push(`Service runtime: ${runtimeSummary}`); + } + + if (snapshot.portUsage.status === "busy") { + lines.push(...formatPortDiagnostics(snapshot.portUsage)); + } else { + lines.push(`Gateway port ${snapshot.portUsage.port} status: ${snapshot.portUsage.status}.`); + } + + if (snapshot.portUsage.errors?.length) { + lines.push(`Port diagnostics errors: ${snapshot.portUsage.errors.join("; ")}`); + } + + return lines; +} + +export async function terminateStaleGatewayPids(pids: number[]): Promise { + const killed: number[] = []; + for (const pid of pids) { + try { + process.kill(pid, "SIGTERM"); + killed.push(pid); + } catch (err) { + const code = (err as NodeJS.ErrnoException)?.code; + if (code !== "ESRCH") { + throw err; + } + } + } + + if (killed.length === 0) { + return killed; + } + + await sleep(400); + + for (const pid of killed) { + try { + process.kill(pid, 0); + process.kill(pid, "SIGKILL"); + } catch (err) { + const code = (err as NodeJS.ErrnoException)?.code; + if (code !== "ESRCH") { + throw err; + } + } + } + + return killed; +} diff --git a/src/cli/update-cli/update-command.ts b/src/cli/update-cli/update-command.ts index 4a20a7c75..a2a923d3a 100644 --- a/src/cli/update-cli/update-command.ts +++ b/src/cli/update-cli/update-command.ts @@ -10,14 +10,7 @@ import { resolveGatewayPort, writeConfigFile, } from "../../config/config.js"; -import type { GatewayServiceRuntime } from "../../daemon/service-runtime.js"; import { resolveGatewayService } from "../../daemon/service.js"; -import { - classifyPortListener, - formatPortDiagnostics, - inspectPortUsage, - type PortUsage, -} from "../../infra/ports.js"; import { channelToNpmTag, DEFAULT_GIT_CHANNEL, @@ -40,11 +33,16 @@ import { runCommandWithTimeout } from "../../process/exec.js"; import { defaultRuntime } from "../../runtime.js"; import { stylePromptMessage } from "../../terminal/prompt-style.js"; import { theme } from "../../terminal/theme.js"; -import { pathExists, sleep } from "../../utils.js"; +import { pathExists } from "../../utils.js"; import { replaceCliName, resolveCliName } from "../cli-name.js"; import { formatCliCommand } from "../command-format.js"; import { installCompletion } from "../completion-cli.js"; import { runDaemonInstall, runDaemonRestart } from "../daemon-cli.js"; +import { + renderRestartDiagnostics, + terminateStaleGatewayPids, + waitForGatewayHealthyRestart, +} from "../daemon-cli/restart-health.js"; import { createUpdateProgress, printResult } from "./progress.js"; import { prepareRestartScript, runRestartScript } from "./restart-helper.js"; import { @@ -67,8 +65,6 @@ import { suppressDeprecations } from "./suppress-deprecations.js"; const CLI_NAME = resolveCliName(); const SERVICE_REFRESH_TIMEOUT_MS = 60_000; -const POST_RESTART_HEALTH_ATTEMPTS = 8; -const POST_RESTART_HEALTH_DELAY_MS = 450; const UPDATE_QUIPS = [ "Leveled up! New skills unlocked. You're welcome.", @@ -97,13 +93,6 @@ function pickUpdateQuip(): string { return UPDATE_QUIPS[Math.floor(Math.random() * UPDATE_QUIPS.length)] ?? "Update complete."; } -type GatewayRestartSnapshot = { - runtime: GatewayServiceRuntime; - portUsage: PortUsage; - healthy: boolean; - staleGatewayPids: number[]; -}; - function resolveGatewayInstallEntrypointCandidates(root?: string): string[] { if (!root) { return []; @@ -151,126 +140,6 @@ async function refreshGatewayServiceEnv(params: { await runDaemonInstall({ force: true, json: params.jsonMode || undefined }); } -async function inspectGatewayRestart(port: number): Promise { - const service = resolveGatewayService(); - let runtime: GatewayServiceRuntime = { status: "unknown" }; - try { - runtime = await service.readRuntime(process.env); - } catch (err) { - runtime = { status: "unknown", detail: String(err) }; - } - - let portUsage: PortUsage; - try { - portUsage = await inspectPortUsage(port); - } catch (err) { - portUsage = { - port, - status: "unknown", - listeners: [], - hints: [], - errors: [String(err)], - }; - } - - const gatewayListeners = - portUsage.status === "busy" - ? portUsage.listeners.filter((listener) => classifyPortListener(listener, port) === "gateway") - : []; - const running = runtime.status === "running"; - const ownsPort = - runtime.pid != null - ? portUsage.listeners.some((listener) => listener.pid === runtime.pid) - : gatewayListeners.length > 0 || - (portUsage.status === "busy" && portUsage.listeners.length === 0); - const healthy = running && ownsPort; - const staleGatewayPids = Array.from( - new Set( - gatewayListeners - .map((listener) => listener.pid) - .filter((pid): pid is number => Number.isFinite(pid)) - .filter((pid) => runtime.pid == null || pid !== runtime.pid || !running), - ), - ); - - return { - runtime, - portUsage, - healthy, - staleGatewayPids, - }; -} - -async function waitForGatewayHealthyRestart(port: number): Promise { - let snapshot = await inspectGatewayRestart(port); - for (let attempt = 0; attempt < POST_RESTART_HEALTH_ATTEMPTS; attempt += 1) { - if (snapshot.healthy) { - return snapshot; - } - if (snapshot.staleGatewayPids.length > 0 && snapshot.runtime.status !== "running") { - return snapshot; - } - await sleep(POST_RESTART_HEALTH_DELAY_MS); - snapshot = await inspectGatewayRestart(port); - } - return snapshot; -} - -function renderRestartDiagnostics(snapshot: GatewayRestartSnapshot): string[] { - const lines: string[] = []; - const runtimeSummary = [ - snapshot.runtime.status ? `status=${snapshot.runtime.status}` : null, - snapshot.runtime.state ? `state=${snapshot.runtime.state}` : null, - snapshot.runtime.pid != null ? `pid=${snapshot.runtime.pid}` : null, - snapshot.runtime.lastExitStatus != null ? `lastExit=${snapshot.runtime.lastExitStatus}` : null, - ] - .filter(Boolean) - .join(", "); - if (runtimeSummary) { - lines.push(`Service runtime: ${runtimeSummary}`); - } - if (snapshot.portUsage.status === "busy") { - lines.push(...formatPortDiagnostics(snapshot.portUsage)); - } else { - lines.push(`Gateway port ${snapshot.portUsage.port} status: ${snapshot.portUsage.status}.`); - } - if (snapshot.portUsage.errors?.length) { - lines.push(`Port diagnostics errors: ${snapshot.portUsage.errors.join("; ")}`); - } - return lines; -} - -async function terminateStaleGatewayPids(pids: number[]): Promise { - const killed: number[] = []; - for (const pid of pids) { - try { - process.kill(pid, "SIGTERM"); - killed.push(pid); - } catch (err) { - const code = (err as NodeJS.ErrnoException)?.code; - if (code !== "ESRCH") { - throw err; - } - } - } - if (killed.length === 0) { - return killed; - } - await sleep(400); - for (const pid of killed) { - try { - process.kill(pid, 0); - process.kill(pid, "SIGKILL"); - } catch (err) { - const code = (err as NodeJS.ErrnoException)?.code; - if (code !== "ESRCH") { - throw err; - } - } - } - return killed; -} - async function tryInstallShellCompletion(opts: { jsonMode: boolean; skipPrompt: boolean; @@ -633,7 +502,11 @@ async function maybeRestartService(params: { } if (!params.opts.json && restartInitiated) { - let health = await waitForGatewayHealthyRestart(params.gatewayPort); + const service = resolveGatewayService(); + let health = await waitForGatewayHealthyRestart({ + service, + port: params.gatewayPort, + }); if (!health.healthy && health.staleGatewayPids.length > 0) { if (!params.opts.json) { defaultRuntime.log( @@ -644,7 +517,10 @@ async function maybeRestartService(params: { } await terminateStaleGatewayPids(health.staleGatewayPids); await runDaemonRestart(); - health = await waitForGatewayHealthyRestart(params.gatewayPort); + health = await waitForGatewayHealthyRestart({ + service, + port: params.gatewayPort, + }); } if (health.healthy) {