Gateway: stop and restart unmanaged listeners (#39355)
* Daemon: allow unmanaged gateway lifecycle fallback * Status: fix service summary formatting * Changelog: note unmanaged gateway lifecycle fallback * Tests: cover unmanaged gateway lifecycle fallback * Daemon: split unmanaged restart health checks * Daemon: harden unmanaged gateway signaling * Daemon: reject unmanaged restarts when disabled
This commit is contained in:
@@ -40,10 +40,11 @@ vi.mock("../../runtime.js", () => ({
|
||||
}));
|
||||
|
||||
let runServiceRestart: typeof import("./lifecycle-core.js").runServiceRestart;
|
||||
let runServiceStop: typeof import("./lifecycle-core.js").runServiceStop;
|
||||
|
||||
describe("runServiceRestart token drift", () => {
|
||||
beforeAll(async () => {
|
||||
({ runServiceRestart } = await import("./lifecycle-core.js"));
|
||||
({ runServiceRestart, runServiceStop } = await import("./lifecycle-core.js"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
@@ -130,4 +131,49 @@ describe("runServiceRestart token drift", () => {
|
||||
const payload = JSON.parse(jsonLine ?? "{}") as { warnings?: string[] };
|
||||
expect(payload.warnings).toBeUndefined();
|
||||
});
|
||||
|
||||
it("emits stopped when an unmanaged process handles stop", async () => {
|
||||
service.isLoaded.mockResolvedValue(false);
|
||||
|
||||
await runServiceStop({
|
||||
serviceNoun: "Gateway",
|
||||
service,
|
||||
opts: { json: true },
|
||||
onNotLoaded: async () => ({
|
||||
result: "stopped",
|
||||
message: "Gateway stop signal sent to unmanaged process on port 18789: 4200.",
|
||||
}),
|
||||
});
|
||||
|
||||
const jsonLine = runtimeLogs.find((line) => line.trim().startsWith("{"));
|
||||
const payload = JSON.parse(jsonLine ?? "{}") as { result?: string; message?: string };
|
||||
expect(payload.result).toBe("stopped");
|
||||
expect(payload.message).toContain("unmanaged process");
|
||||
expect(service.stop).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("runs restart health checks after an unmanaged restart signal", async () => {
|
||||
const postRestartCheck = vi.fn(async () => {});
|
||||
service.isLoaded.mockResolvedValue(false);
|
||||
|
||||
await runServiceRestart({
|
||||
serviceNoun: "Gateway",
|
||||
service,
|
||||
renderStartHints: () => [],
|
||||
opts: { json: true },
|
||||
onNotLoaded: async () => ({
|
||||
result: "restarted",
|
||||
message: "Gateway restart signal sent to unmanaged process on port 18789: 4200.",
|
||||
}),
|
||||
postRestartCheck,
|
||||
});
|
||||
|
||||
expect(postRestartCheck).toHaveBeenCalledTimes(1);
|
||||
expect(service.restart).not.toHaveBeenCalled();
|
||||
expect(service.readCommand).not.toHaveBeenCalled();
|
||||
const jsonLine = runtimeLogs.find((line) => line.trim().startsWith("{"));
|
||||
const payload = JSON.parse(jsonLine ?? "{}") as { result?: string; message?: string };
|
||||
expect(payload.result).toBe("restarted");
|
||||
expect(payload.message).toContain("unmanaged process");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -28,6 +28,18 @@ type RestartPostCheckContext = {
|
||||
fail: (message: string, hints?: string[]) => void;
|
||||
};
|
||||
|
||||
type NotLoadedActionResult = {
|
||||
result: "stopped" | "restarted";
|
||||
message?: string;
|
||||
warnings?: string[];
|
||||
};
|
||||
|
||||
type NotLoadedActionContext = {
|
||||
json: boolean;
|
||||
stdout: Writable;
|
||||
fail: (message: string, hints?: string[]) => void;
|
||||
};
|
||||
|
||||
async function maybeAugmentSystemdHints(hints: string[]): Promise<string[]> {
|
||||
if (process.platform !== "linux") {
|
||||
return hints;
|
||||
@@ -200,6 +212,7 @@ export async function runServiceStop(params: {
|
||||
serviceNoun: string;
|
||||
service: GatewayService;
|
||||
opts?: DaemonLifecycleOptions;
|
||||
onNotLoaded?: (ctx: NotLoadedActionContext) => Promise<NotLoadedActionResult | null>;
|
||||
}) {
|
||||
const json = Boolean(params.opts?.json);
|
||||
const { stdout, emit, fail } = createActionIO({ action: "stop", json });
|
||||
@@ -213,6 +226,25 @@ export async function runServiceStop(params: {
|
||||
return;
|
||||
}
|
||||
if (!loaded) {
|
||||
try {
|
||||
const handled = await params.onNotLoaded?.({ json, stdout, fail });
|
||||
if (handled) {
|
||||
emit({
|
||||
ok: true,
|
||||
result: handled.result,
|
||||
message: handled.message,
|
||||
warnings: handled.warnings,
|
||||
service: buildDaemonServiceSnapshot(params.service, false),
|
||||
});
|
||||
if (!json && handled.message) {
|
||||
defaultRuntime.log(handled.message);
|
||||
}
|
||||
return;
|
||||
}
|
||||
} catch (err) {
|
||||
fail(`${params.serviceNoun} stop failed: ${String(err)}`);
|
||||
return;
|
||||
}
|
||||
emit({
|
||||
ok: true,
|
||||
result: "not-loaded",
|
||||
@@ -251,9 +283,12 @@ export async function runServiceRestart(params: {
|
||||
opts?: DaemonLifecycleOptions;
|
||||
checkTokenDrift?: boolean;
|
||||
postRestartCheck?: (ctx: RestartPostCheckContext) => Promise<void>;
|
||||
onNotLoaded?: (ctx: NotLoadedActionContext) => Promise<NotLoadedActionResult | null>;
|
||||
}): Promise<boolean> {
|
||||
const json = Boolean(params.opts?.json);
|
||||
const { stdout, emit, fail } = createActionIO({ action: "restart", json });
|
||||
const warnings: string[] = [];
|
||||
let handledNotLoaded: NotLoadedActionResult | null = null;
|
||||
|
||||
const loaded = await resolveServiceLoadedOrFail({
|
||||
serviceNoun: params.serviceNoun,
|
||||
@@ -264,19 +299,29 @@ export async function runServiceRestart(params: {
|
||||
return false;
|
||||
}
|
||||
if (!loaded) {
|
||||
await handleServiceNotLoaded({
|
||||
serviceNoun: params.serviceNoun,
|
||||
service: params.service,
|
||||
loaded,
|
||||
renderStartHints: params.renderStartHints,
|
||||
json,
|
||||
emit,
|
||||
});
|
||||
return false;
|
||||
try {
|
||||
handledNotLoaded = (await params.onNotLoaded?.({ json, stdout, fail })) ?? null;
|
||||
} catch (err) {
|
||||
fail(`${params.serviceNoun} restart failed: ${String(err)}`);
|
||||
return false;
|
||||
}
|
||||
if (!handledNotLoaded) {
|
||||
await handleServiceNotLoaded({
|
||||
serviceNoun: params.serviceNoun,
|
||||
service: params.service,
|
||||
loaded,
|
||||
renderStartHints: params.renderStartHints,
|
||||
json,
|
||||
emit,
|
||||
});
|
||||
return false;
|
||||
}
|
||||
if (handledNotLoaded.warnings?.length) {
|
||||
warnings.push(...handledNotLoaded.warnings);
|
||||
}
|
||||
}
|
||||
|
||||
const warnings: string[] = [];
|
||||
if (params.checkTokenDrift) {
|
||||
if (loaded && params.checkTokenDrift) {
|
||||
// Check for token drift before restart (service token vs config token)
|
||||
try {
|
||||
const command = await params.service.readCommand(process.env);
|
||||
@@ -309,22 +354,30 @@ export async function runServiceRestart(params: {
|
||||
}
|
||||
|
||||
try {
|
||||
await params.service.restart({ env: process.env, stdout });
|
||||
if (loaded) {
|
||||
await params.service.restart({ env: process.env, stdout });
|
||||
}
|
||||
if (params.postRestartCheck) {
|
||||
await params.postRestartCheck({ json, stdout, warnings, fail });
|
||||
}
|
||||
let restarted = true;
|
||||
try {
|
||||
restarted = await params.service.isLoaded({ env: process.env });
|
||||
} catch {
|
||||
restarted = true;
|
||||
let restarted = loaded;
|
||||
if (loaded) {
|
||||
try {
|
||||
restarted = await params.service.isLoaded({ env: process.env });
|
||||
} catch {
|
||||
restarted = true;
|
||||
}
|
||||
}
|
||||
emit({
|
||||
ok: true,
|
||||
result: "restarted",
|
||||
message: handledNotLoaded?.message,
|
||||
service: buildDaemonServiceSnapshot(params.service, restarted),
|
||||
warnings: warnings.length ? warnings : undefined,
|
||||
});
|
||||
if (!json && handledNotLoaded?.message) {
|
||||
defaultRuntime.log(handledNotLoaded.message);
|
||||
}
|
||||
return true;
|
||||
} catch (err) {
|
||||
const hints = params.renderStartHints();
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const mockReadFileSync = vi.hoisted(() => vi.fn());
|
||||
const mockSpawnSync = vi.hoisted(() => vi.fn());
|
||||
|
||||
type RestartHealthSnapshot = {
|
||||
healthy: boolean;
|
||||
@@ -25,18 +28,59 @@ const service = {
|
||||
};
|
||||
|
||||
const runServiceRestart = vi.fn();
|
||||
const runServiceStop = vi.fn();
|
||||
const waitForGatewayHealthyListener = vi.fn();
|
||||
const waitForGatewayHealthyRestart = vi.fn();
|
||||
const terminateStaleGatewayPids = vi.fn();
|
||||
const renderGatewayPortHealthDiagnostics = vi.fn(() => ["diag: unhealthy port"]);
|
||||
const renderRestartDiagnostics = vi.fn(() => ["diag: unhealthy runtime"]);
|
||||
const resolveGatewayPort = vi.fn(() => 18789);
|
||||
const findGatewayPidsOnPortSync = vi.fn<(port: number) => number[]>(() => []);
|
||||
const probeGateway = vi.fn<
|
||||
(opts: {
|
||||
url: string;
|
||||
auth?: { token?: string; password?: string };
|
||||
timeoutMs: number;
|
||||
}) => Promise<{
|
||||
ok: boolean;
|
||||
configSnapshot: unknown;
|
||||
}>
|
||||
>();
|
||||
const isRestartEnabled = vi.fn<(config?: { commands?: unknown }) => boolean>(() => true);
|
||||
const loadConfig = vi.fn(() => ({}));
|
||||
|
||||
vi.mock("node:fs", () => ({
|
||||
default: {
|
||||
readFileSync: (...args: unknown[]) => mockReadFileSync(...args),
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock("node:child_process", () => ({
|
||||
spawnSync: (...args: unknown[]) => mockSpawnSync(...args),
|
||||
}));
|
||||
|
||||
vi.mock("../../config/config.js", () => ({
|
||||
loadConfig: () => loadConfig(),
|
||||
readBestEffortConfig: async () => loadConfig(),
|
||||
resolveGatewayPort,
|
||||
}));
|
||||
|
||||
vi.mock("../../infra/restart.js", () => ({
|
||||
findGatewayPidsOnPortSync: (port: number) => findGatewayPidsOnPortSync(port),
|
||||
}));
|
||||
|
||||
vi.mock("../../gateway/probe.js", () => ({
|
||||
probeGateway: (opts: {
|
||||
url: string;
|
||||
auth?: { token?: string; password?: string };
|
||||
timeoutMs: number;
|
||||
}) => probeGateway(opts),
|
||||
}));
|
||||
|
||||
vi.mock("../../config/commands.js", () => ({
|
||||
isRestartEnabled: (config?: { commands?: unknown }) => isRestartEnabled(config),
|
||||
}));
|
||||
|
||||
vi.mock("../../daemon/service.js", () => ({
|
||||
resolveGatewayService: () => service,
|
||||
}));
|
||||
@@ -44,7 +88,9 @@ vi.mock("../../daemon/service.js", () => ({
|
||||
vi.mock("./restart-health.js", () => ({
|
||||
DEFAULT_RESTART_HEALTH_ATTEMPTS: 120,
|
||||
DEFAULT_RESTART_HEALTH_DELAY_MS: 500,
|
||||
waitForGatewayHealthyListener,
|
||||
waitForGatewayHealthyRestart,
|
||||
renderGatewayPortHealthDiagnostics,
|
||||
terminateStaleGatewayPids,
|
||||
renderRestartDiagnostics,
|
||||
}));
|
||||
@@ -52,26 +98,35 @@ vi.mock("./restart-health.js", () => ({
|
||||
vi.mock("./lifecycle-core.js", () => ({
|
||||
runServiceRestart,
|
||||
runServiceStart: vi.fn(),
|
||||
runServiceStop: vi.fn(),
|
||||
runServiceStop,
|
||||
runServiceUninstall: vi.fn(),
|
||||
}));
|
||||
|
||||
describe("runDaemonRestart health checks", () => {
|
||||
let runDaemonRestart: (opts?: { json?: boolean }) => Promise<boolean>;
|
||||
let runDaemonStop: (opts?: { json?: boolean }) => Promise<void>;
|
||||
|
||||
beforeAll(async () => {
|
||||
({ runDaemonRestart } = await import("./lifecycle.js"));
|
||||
({ runDaemonRestart, runDaemonStop } = await import("./lifecycle.js"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
service.readCommand.mockClear();
|
||||
service.restart.mockClear();
|
||||
runServiceRestart.mockClear();
|
||||
waitForGatewayHealthyRestart.mockClear();
|
||||
terminateStaleGatewayPids.mockClear();
|
||||
renderRestartDiagnostics.mockClear();
|
||||
resolveGatewayPort.mockClear();
|
||||
loadConfig.mockClear();
|
||||
service.readCommand.mockReset();
|
||||
service.restart.mockReset();
|
||||
runServiceRestart.mockReset();
|
||||
runServiceStop.mockReset();
|
||||
waitForGatewayHealthyListener.mockReset();
|
||||
waitForGatewayHealthyRestart.mockReset();
|
||||
terminateStaleGatewayPids.mockReset();
|
||||
renderGatewayPortHealthDiagnostics.mockReset();
|
||||
renderRestartDiagnostics.mockReset();
|
||||
resolveGatewayPort.mockReset();
|
||||
findGatewayPidsOnPortSync.mockReset();
|
||||
probeGateway.mockReset();
|
||||
isRestartEnabled.mockReset();
|
||||
loadConfig.mockReset();
|
||||
mockReadFileSync.mockReset();
|
||||
mockSpawnSync.mockReset();
|
||||
|
||||
service.readCommand.mockResolvedValue({
|
||||
programArguments: ["openclaw", "gateway", "--port", "18789"],
|
||||
@@ -92,6 +147,37 @@ describe("runDaemonRestart health checks", () => {
|
||||
});
|
||||
return true;
|
||||
});
|
||||
runServiceStop.mockResolvedValue(undefined);
|
||||
waitForGatewayHealthyListener.mockResolvedValue({
|
||||
healthy: true,
|
||||
portUsage: { port: 18789, status: "busy", listeners: [], hints: [] },
|
||||
});
|
||||
probeGateway.mockResolvedValue({
|
||||
ok: true,
|
||||
configSnapshot: { commands: { restart: true } },
|
||||
});
|
||||
isRestartEnabled.mockReturnValue(true);
|
||||
mockReadFileSync.mockImplementation((path: string) => {
|
||||
const match = path.match(/\/proc\/(\d+)\/cmdline$/);
|
||||
if (!match) {
|
||||
throw new Error(`unexpected path ${path}`);
|
||||
}
|
||||
const pid = Number.parseInt(match[1] ?? "", 10);
|
||||
if ([4200, 4300].includes(pid)) {
|
||||
return ["openclaw", "gateway", "--port", "18789", ""].join("\0");
|
||||
}
|
||||
throw new Error(`unknown pid ${pid}`);
|
||||
});
|
||||
mockSpawnSync.mockReturnValue({
|
||||
error: null,
|
||||
status: 0,
|
||||
stdout: "openclaw gateway --port 18789",
|
||||
stderr: "",
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it("kills stale gateway pids and retries restart", async () => {
|
||||
@@ -134,4 +220,99 @@ describe("runDaemonRestart health checks", () => {
|
||||
expect(terminateStaleGatewayPids).not.toHaveBeenCalled();
|
||||
expect(renderRestartDiagnostics).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("signals an unmanaged gateway process on stop", async () => {
|
||||
const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true);
|
||||
findGatewayPidsOnPortSync.mockReturnValue([4200, 4200, 4300]);
|
||||
runServiceStop.mockImplementation(async (params: { onNotLoaded?: () => Promise<unknown> }) => {
|
||||
await params.onNotLoaded?.();
|
||||
});
|
||||
|
||||
await runDaemonStop({ json: true });
|
||||
|
||||
expect(findGatewayPidsOnPortSync).toHaveBeenCalledWith(18789);
|
||||
expect(killSpy).toHaveBeenCalledWith(4200, "SIGTERM");
|
||||
expect(killSpy).toHaveBeenCalledWith(4300, "SIGTERM");
|
||||
});
|
||||
|
||||
it("signals a single unmanaged gateway process on restart", async () => {
|
||||
const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true);
|
||||
findGatewayPidsOnPortSync.mockReturnValue([4200]);
|
||||
runServiceRestart.mockImplementation(
|
||||
async (params: RestartParams & { onNotLoaded?: () => Promise<unknown> }) => {
|
||||
await params.onNotLoaded?.();
|
||||
await params.postRestartCheck?.({
|
||||
json: Boolean(params.opts?.json),
|
||||
stdout: process.stdout,
|
||||
warnings: [],
|
||||
fail: (message: string) => {
|
||||
throw new Error(message);
|
||||
},
|
||||
});
|
||||
return true;
|
||||
},
|
||||
);
|
||||
|
||||
await runDaemonRestart({ json: true });
|
||||
|
||||
expect(findGatewayPidsOnPortSync).toHaveBeenCalledWith(18789);
|
||||
expect(killSpy).toHaveBeenCalledWith(4200, "SIGUSR1");
|
||||
expect(probeGateway).toHaveBeenCalledTimes(1);
|
||||
expect(waitForGatewayHealthyListener).toHaveBeenCalledTimes(1);
|
||||
expect(waitForGatewayHealthyRestart).not.toHaveBeenCalled();
|
||||
expect(terminateStaleGatewayPids).not.toHaveBeenCalled();
|
||||
expect(service.restart).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("fails unmanaged restart when multiple gateway listeners are present", async () => {
|
||||
findGatewayPidsOnPortSync.mockReturnValue([4200, 4300]);
|
||||
runServiceRestart.mockImplementation(
|
||||
async (params: RestartParams & { onNotLoaded?: () => Promise<unknown> }) => {
|
||||
await params.onNotLoaded?.();
|
||||
return true;
|
||||
},
|
||||
);
|
||||
|
||||
await expect(runDaemonRestart({ json: true })).rejects.toThrow(
|
||||
"multiple gateway processes are listening on port 18789",
|
||||
);
|
||||
});
|
||||
|
||||
it("fails unmanaged restart when the running gateway has commands.restart disabled", async () => {
|
||||
findGatewayPidsOnPortSync.mockReturnValue([4200]);
|
||||
probeGateway.mockResolvedValue({
|
||||
ok: true,
|
||||
configSnapshot: { commands: { restart: false } },
|
||||
});
|
||||
isRestartEnabled.mockReturnValue(false);
|
||||
runServiceRestart.mockImplementation(
|
||||
async (params: RestartParams & { onNotLoaded?: () => Promise<unknown> }) => {
|
||||
await params.onNotLoaded?.();
|
||||
return true;
|
||||
},
|
||||
);
|
||||
|
||||
await expect(runDaemonRestart({ json: true })).rejects.toThrow(
|
||||
"Gateway restart is disabled in the running gateway config",
|
||||
);
|
||||
});
|
||||
|
||||
it("skips unmanaged signaling for pids that are not live gateway processes", async () => {
|
||||
const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true);
|
||||
findGatewayPidsOnPortSync.mockReturnValue([4200]);
|
||||
mockReadFileSync.mockReturnValue(["python", "-m", "http.server", ""].join("\0"));
|
||||
mockSpawnSync.mockReturnValue({
|
||||
error: null,
|
||||
status: 0,
|
||||
stdout: "python -m http.server",
|
||||
stderr: "",
|
||||
});
|
||||
runServiceStop.mockImplementation(async (params: { onNotLoaded?: () => Promise<unknown> }) => {
|
||||
await params.onNotLoaded?.();
|
||||
});
|
||||
|
||||
await runDaemonStop({ json: true });
|
||||
|
||||
expect(killSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
import { spawnSync } from "node:child_process";
|
||||
import fsSync from "node:fs";
|
||||
import { isRestartEnabled } from "../../config/commands.js";
|
||||
import { readBestEffortConfig, resolveGatewayPort } from "../../config/config.js";
|
||||
import { resolveGatewayService } from "../../daemon/service.js";
|
||||
import { probeGateway } from "../../gateway/probe.js";
|
||||
import { findGatewayPidsOnPortSync } from "../../infra/restart.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import { theme } from "../../terminal/theme.js";
|
||||
import { formatCliCommand } from "../command-format.js";
|
||||
@@ -12,8 +17,10 @@ import {
|
||||
import {
|
||||
DEFAULT_RESTART_HEALTH_ATTEMPTS,
|
||||
DEFAULT_RESTART_HEALTH_DELAY_MS,
|
||||
renderGatewayPortHealthDiagnostics,
|
||||
renderRestartDiagnostics,
|
||||
terminateStaleGatewayPids,
|
||||
waitForGatewayHealthyListener,
|
||||
waitForGatewayHealthyRestart,
|
||||
} from "./restart-health.js";
|
||||
import { parsePortFromArgs, renderGatewayServiceStartHints } from "./shared.js";
|
||||
@@ -22,8 +29,7 @@ import type { DaemonLifecycleOptions } from "./types.js";
|
||||
const POST_RESTART_HEALTH_ATTEMPTS = DEFAULT_RESTART_HEALTH_ATTEMPTS;
|
||||
const POST_RESTART_HEALTH_DELAY_MS = DEFAULT_RESTART_HEALTH_DELAY_MS;
|
||||
|
||||
async function resolveGatewayRestartPort() {
|
||||
const service = resolveGatewayService();
|
||||
async function resolveGatewayLifecyclePort(service = resolveGatewayService()) {
|
||||
const command = await service.readCommand(process.env).catch(() => null);
|
||||
const serviceEnv = command?.environment ?? undefined;
|
||||
const mergedEnv = {
|
||||
@@ -35,6 +41,145 @@ async function resolveGatewayRestartPort() {
|
||||
return portFromArgs ?? resolveGatewayPort(await readBestEffortConfig(), mergedEnv);
|
||||
}
|
||||
|
||||
function normalizeProcArg(arg: string): string {
|
||||
return arg.replaceAll("\\", "/").toLowerCase();
|
||||
}
|
||||
|
||||
function parseProcCmdline(raw: string): string[] {
|
||||
return raw
|
||||
.split("\0")
|
||||
.map((entry) => entry.trim())
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
function isGatewayArgv(args: string[]): boolean {
|
||||
const normalized = args.map(normalizeProcArg);
|
||||
if (!normalized.includes("gateway")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const entryCandidates = [
|
||||
"dist/index.js",
|
||||
"dist/entry.js",
|
||||
"openclaw.mjs",
|
||||
"scripts/run-node.mjs",
|
||||
"src/index.ts",
|
||||
];
|
||||
if (normalized.some((arg) => entryCandidates.some((entry) => arg.endsWith(entry)))) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const exe = normalized[0] ?? "";
|
||||
return exe.endsWith("/openclaw") || exe === "openclaw" || exe.endsWith("/openclaw-gateway");
|
||||
}
|
||||
|
||||
function readGatewayProcessArgsSync(pid: number): string[] | null {
|
||||
if (process.platform === "linux") {
|
||||
try {
|
||||
return parseProcCmdline(fsSync.readFileSync(`/proc/${pid}/cmdline`, "utf8"));
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
if (process.platform === "darwin") {
|
||||
const ps = spawnSync("ps", ["-o", "command=", "-p", String(pid)], {
|
||||
encoding: "utf8",
|
||||
timeout: 1000,
|
||||
});
|
||||
if (ps.error || ps.status !== 0) {
|
||||
return null;
|
||||
}
|
||||
const command = ps.stdout.trim();
|
||||
return command ? command.split(/\s+/) : null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function resolveGatewayListenerPids(port: number): number[] {
|
||||
return Array.from(new Set(findGatewayPidsOnPortSync(port)))
|
||||
.filter((pid): pid is number => Number.isFinite(pid) && pid > 0)
|
||||
.filter((pid) => {
|
||||
const args = readGatewayProcessArgsSync(pid);
|
||||
return args != null && isGatewayArgv(args);
|
||||
});
|
||||
}
|
||||
|
||||
function resolveGatewayPortFallback(): Promise<number> {
|
||||
return readBestEffortConfig()
|
||||
.then((cfg) => resolveGatewayPort(cfg, process.env))
|
||||
.catch(() => resolveGatewayPort(undefined, process.env));
|
||||
}
|
||||
|
||||
function signalGatewayPid(pid: number, signal: "SIGTERM" | "SIGUSR1") {
|
||||
const args = readGatewayProcessArgsSync(pid);
|
||||
if (!args || !isGatewayArgv(args)) {
|
||||
throw new Error(`refusing to signal non-gateway process pid ${pid}`);
|
||||
}
|
||||
process.kill(pid, signal);
|
||||
}
|
||||
|
||||
function formatGatewayPidList(pids: number[]): string {
|
||||
return pids.join(", ");
|
||||
}
|
||||
|
||||
async function assertUnmanagedGatewayRestartEnabled(port: number): Promise<void> {
|
||||
const probe = await probeGateway({
|
||||
url: `ws://127.0.0.1:${port}`,
|
||||
auth: {
|
||||
token: process.env.OPENCLAW_GATEWAY_TOKEN?.trim() || undefined,
|
||||
password: process.env.OPENCLAW_GATEWAY_PASSWORD?.trim() || undefined,
|
||||
},
|
||||
timeoutMs: 1_000,
|
||||
}).catch(() => null);
|
||||
|
||||
if (!probe?.ok) {
|
||||
return;
|
||||
}
|
||||
if (!isRestartEnabled(probe.configSnapshot as { commands?: unknown } | undefined)) {
|
||||
throw new Error(
|
||||
"Gateway restart is disabled in the running gateway config (commands.restart=false); unmanaged SIGUSR1 restart would be ignored",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function resolveVerifiedGatewayListenerPids(port: number): number[] {
|
||||
return resolveGatewayListenerPids(port).filter(
|
||||
(pid): pid is number => Number.isFinite(pid) && pid > 0,
|
||||
);
|
||||
}
|
||||
|
||||
async function stopGatewayWithoutServiceManager(port: number) {
|
||||
const pids = resolveVerifiedGatewayListenerPids(port);
|
||||
if (pids.length === 0) {
|
||||
return null;
|
||||
}
|
||||
for (const pid of pids) {
|
||||
signalGatewayPid(pid, "SIGTERM");
|
||||
}
|
||||
return {
|
||||
result: "stopped" as const,
|
||||
message: `Gateway stop signal sent to unmanaged process${pids.length === 1 ? "" : "es"} on port ${port}: ${formatGatewayPidList(pids)}.`,
|
||||
};
|
||||
}
|
||||
|
||||
async function restartGatewayWithoutServiceManager(port: number) {
|
||||
await assertUnmanagedGatewayRestartEnabled(port);
|
||||
const pids = resolveVerifiedGatewayListenerPids(port);
|
||||
if (pids.length === 0) {
|
||||
return null;
|
||||
}
|
||||
if (pids.length > 1) {
|
||||
throw new Error(
|
||||
`multiple gateway processes are listening on port ${port}: ${formatGatewayPidList(pids)}; use "openclaw gateway status --deep" before retrying restart`,
|
||||
);
|
||||
}
|
||||
signalGatewayPid(pids[0], "SIGUSR1");
|
||||
return {
|
||||
result: "restarted" as const,
|
||||
message: `Gateway restart signal sent to unmanaged process on port ${port}: ${pids[0]}.`,
|
||||
};
|
||||
}
|
||||
|
||||
export async function runDaemonUninstall(opts: DaemonLifecycleOptions = {}) {
|
||||
return await runServiceUninstall({
|
||||
serviceNoun: "Gateway",
|
||||
@@ -55,10 +200,15 @@ export async function runDaemonStart(opts: DaemonLifecycleOptions = {}) {
|
||||
}
|
||||
|
||||
export async function runDaemonStop(opts: DaemonLifecycleOptions = {}) {
|
||||
const service = resolveGatewayService();
|
||||
const gatewayPort = await resolveGatewayLifecyclePort(service).catch(() =>
|
||||
resolveGatewayPortFallback(),
|
||||
);
|
||||
return await runServiceStop({
|
||||
serviceNoun: "Gateway",
|
||||
service: resolveGatewayService(),
|
||||
service,
|
||||
opts,
|
||||
onNotLoaded: async () => stopGatewayWithoutServiceManager(gatewayPort),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -70,8 +220,9 @@ export async function runDaemonStop(opts: DaemonLifecycleOptions = {}) {
|
||||
export async function runDaemonRestart(opts: DaemonLifecycleOptions = {}): Promise<boolean> {
|
||||
const json = Boolean(opts.json);
|
||||
const service = resolveGatewayService();
|
||||
const restartPort = await resolveGatewayRestartPort().catch(async () =>
|
||||
resolveGatewayPort(await readBestEffortConfig(), process.env),
|
||||
let restartedWithoutServiceManager = false;
|
||||
const restartPort = await resolveGatewayLifecyclePort(service).catch(() =>
|
||||
resolveGatewayPortFallback(),
|
||||
);
|
||||
const restartWaitMs = POST_RESTART_HEALTH_ATTEMPTS * POST_RESTART_HEALTH_DELAY_MS;
|
||||
const restartWaitSeconds = Math.round(restartWaitMs / 1000);
|
||||
@@ -82,7 +233,42 @@ export async function runDaemonRestart(opts: DaemonLifecycleOptions = {}): Promi
|
||||
renderStartHints: renderGatewayServiceStartHints,
|
||||
opts,
|
||||
checkTokenDrift: true,
|
||||
onNotLoaded: async () => {
|
||||
const handled = await restartGatewayWithoutServiceManager(restartPort);
|
||||
if (handled) {
|
||||
restartedWithoutServiceManager = true;
|
||||
}
|
||||
return handled;
|
||||
},
|
||||
postRestartCheck: async ({ warnings, fail, stdout }) => {
|
||||
if (restartedWithoutServiceManager) {
|
||||
const health = await waitForGatewayHealthyListener({
|
||||
port: restartPort,
|
||||
attempts: POST_RESTART_HEALTH_ATTEMPTS,
|
||||
delayMs: POST_RESTART_HEALTH_DELAY_MS,
|
||||
});
|
||||
if (health.healthy) {
|
||||
return;
|
||||
}
|
||||
|
||||
const diagnostics = renderGatewayPortHealthDiagnostics(health);
|
||||
const timeoutLine = `Timed out after ${restartWaitSeconds}s waiting for gateway port ${restartPort} to become healthy.`;
|
||||
if (!json) {
|
||||
defaultRuntime.log(theme.warn(timeoutLine));
|
||||
for (const line of diagnostics) {
|
||||
defaultRuntime.log(theme.muted(line));
|
||||
}
|
||||
} else {
|
||||
warnings.push(timeoutLine);
|
||||
warnings.push(...diagnostics);
|
||||
}
|
||||
|
||||
fail(`Gateway restart timed out after ${restartWaitSeconds}s waiting for health checks.`, [
|
||||
formatCliCommand("openclaw gateway status --deep"),
|
||||
formatCliCommand("openclaw doctor"),
|
||||
]);
|
||||
}
|
||||
|
||||
let health = await waitForGatewayHealthyRestart({
|
||||
service,
|
||||
port: restartPort,
|
||||
|
||||
@@ -23,6 +23,11 @@ export type GatewayRestartSnapshot = {
|
||||
staleGatewayPids: number[];
|
||||
};
|
||||
|
||||
export type GatewayPortHealthSnapshot = {
|
||||
portUsage: PortUsage;
|
||||
healthy: boolean;
|
||||
};
|
||||
|
||||
function listenerOwnedByRuntimePid(params: {
|
||||
listener: PortUsage["listeners"][number];
|
||||
runtimePid: number;
|
||||
@@ -55,6 +60,32 @@ async function confirmGatewayReachable(port: number): Promise<boolean> {
|
||||
return probe.ok || looksLikeAuthClose(probe.close?.code, probe.close?.reason);
|
||||
}
|
||||
|
||||
async function inspectGatewayPortHealth(port: number): Promise<GatewayPortHealthSnapshot> {
|
||||
let portUsage: PortUsage;
|
||||
try {
|
||||
portUsage = await inspectPortUsage(port);
|
||||
} catch (err) {
|
||||
portUsage = {
|
||||
port,
|
||||
status: "unknown",
|
||||
listeners: [],
|
||||
hints: [],
|
||||
errors: [String(err)],
|
||||
};
|
||||
}
|
||||
|
||||
let healthy = false;
|
||||
if (portUsage.status === "busy") {
|
||||
try {
|
||||
healthy = await confirmGatewayReachable(port);
|
||||
} catch {
|
||||
// best-effort probe
|
||||
}
|
||||
}
|
||||
|
||||
return { portUsage, healthy };
|
||||
}
|
||||
|
||||
export async function inspectGatewayRestart(params: {
|
||||
service: GatewayService;
|
||||
port: number;
|
||||
@@ -178,6 +209,27 @@ export async function waitForGatewayHealthyRestart(params: {
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
export async function waitForGatewayHealthyListener(params: {
|
||||
port: number;
|
||||
attempts?: number;
|
||||
delayMs?: number;
|
||||
}): Promise<GatewayPortHealthSnapshot> {
|
||||
const attempts = params.attempts ?? DEFAULT_RESTART_HEALTH_ATTEMPTS;
|
||||
const delayMs = params.delayMs ?? DEFAULT_RESTART_HEALTH_DELAY_MS;
|
||||
|
||||
let snapshot = await inspectGatewayPortHealth(params.port);
|
||||
|
||||
for (let attempt = 0; attempt < attempts; attempt += 1) {
|
||||
if (snapshot.healthy) {
|
||||
return snapshot;
|
||||
}
|
||||
await sleep(delayMs);
|
||||
snapshot = await inspectGatewayPortHealth(params.port);
|
||||
}
|
||||
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
export function renderRestartDiagnostics(snapshot: GatewayRestartSnapshot): string[] {
|
||||
const lines: string[] = [];
|
||||
const runtimeSummary = [
|
||||
@@ -206,6 +258,22 @@ export function renderRestartDiagnostics(snapshot: GatewayRestartSnapshot): stri
|
||||
return lines;
|
||||
}
|
||||
|
||||
export function renderGatewayPortHealthDiagnostics(snapshot: GatewayPortHealthSnapshot): string[] {
|
||||
const lines: string[] = [];
|
||||
|
||||
if (snapshot.portUsage.status === "busy") {
|
||||
lines.push(...formatPortDiagnostics(snapshot.portUsage));
|
||||
} else {
|
||||
lines.push(`Gateway port ${snapshot.portUsage.port} status: ${snapshot.portUsage.status}.`);
|
||||
}
|
||||
|
||||
if (snapshot.portUsage.errors?.length) {
|
||||
lines.push(`Port diagnostics errors: ${snapshot.portUsage.errors.join("; ")}`);
|
||||
}
|
||||
|
||||
return lines;
|
||||
}
|
||||
|
||||
export async function terminateStaleGatewayPids(pids: number[]): Promise<number[]> {
|
||||
const targets = Array.from(
|
||||
new Set(pids.filter((pid): pid is number => Number.isFinite(pid) && pid > 0)),
|
||||
|
||||
Reference in New Issue
Block a user