refactor(exec): extract bash tool runtime internals
This commit is contained in:
716
src/agents/bash-tools.exec-runtime.ts
Normal file
716
src/agents/bash-tools.exec-runtime.ts
Normal file
@@ -0,0 +1,716 @@
|
||||
import type { AgentToolResult } from "@mariozechner/pi-agent-core";
|
||||
import type { ChildProcessWithoutNullStreams } from "node:child_process";
|
||||
import { Type } from "@sinclair/typebox";
|
||||
import path from "node:path";
|
||||
import type { ExecAsk, ExecHost, ExecSecurity } from "../infra/exec-approvals.js";
|
||||
import type { ProcessSession, SessionStdin } from "./bash-process-registry.js";
|
||||
import type { ExecToolDetails } from "./bash-tools.exec.js";
|
||||
import type { BashSandboxConfig } from "./bash-tools.shared.js";
|
||||
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import { logWarn } from "../logger.js";
|
||||
import { formatSpawnError, spawnWithFallback } from "../process/spawn-utils.js";
|
||||
import {
|
||||
addSession,
|
||||
appendOutput,
|
||||
createSessionSlug,
|
||||
markExited,
|
||||
tail,
|
||||
} from "./bash-process-registry.js";
|
||||
import {
|
||||
buildDockerExecArgs,
|
||||
chunkString,
|
||||
clampWithDefault,
|
||||
killSession,
|
||||
readEnvInt,
|
||||
} from "./bash-tools.shared.js";
|
||||
import { buildCursorPositionResponse, stripDsrRequests } from "./pty-dsr.js";
|
||||
import { getShellConfig, sanitizeBinaryOutput } from "./shell-utils.js";
|
||||
|
||||
// Security: Blocklist of environment variables that could alter execution flow
|
||||
// or inject code when running on non-sandboxed hosts (Gateway/Node).
|
||||
const DANGEROUS_HOST_ENV_VARS = new Set([
|
||||
"LD_PRELOAD",
|
||||
"LD_LIBRARY_PATH",
|
||||
"LD_AUDIT",
|
||||
"DYLD_INSERT_LIBRARIES",
|
||||
"DYLD_LIBRARY_PATH",
|
||||
"NODE_OPTIONS",
|
||||
"NODE_PATH",
|
||||
"PYTHONPATH",
|
||||
"PYTHONHOME",
|
||||
"RUBYLIB",
|
||||
"PERL5LIB",
|
||||
"BASH_ENV",
|
||||
"ENV",
|
||||
"GCONV_PATH",
|
||||
"IFS",
|
||||
"SSLKEYLOGFILE",
|
||||
]);
|
||||
const DANGEROUS_HOST_ENV_PREFIXES = ["DYLD_", "LD_"];
|
||||
|
||||
// Centralized sanitization helper.
|
||||
// Throws an error if dangerous variables or PATH modifications are detected on the host.
|
||||
export function validateHostEnv(env: Record<string, string>): void {
|
||||
for (const key of Object.keys(env)) {
|
||||
const upperKey = key.toUpperCase();
|
||||
|
||||
// 1. Block known dangerous variables (Fail Closed)
|
||||
if (DANGEROUS_HOST_ENV_PREFIXES.some((prefix) => upperKey.startsWith(prefix))) {
|
||||
throw new Error(
|
||||
`Security Violation: Environment variable '${key}' is forbidden during host execution.`,
|
||||
);
|
||||
}
|
||||
if (DANGEROUS_HOST_ENV_VARS.has(upperKey)) {
|
||||
throw new Error(
|
||||
`Security Violation: Environment variable '${key}' is forbidden during host execution.`,
|
||||
);
|
||||
}
|
||||
|
||||
// 2. Strictly block PATH modification on host
|
||||
// Allowing custom PATH on the gateway/node can lead to binary hijacking.
|
||||
if (upperKey === "PATH") {
|
||||
throw new Error(
|
||||
"Security Violation: Custom 'PATH' variable is forbidden during host execution.",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
export const DEFAULT_MAX_OUTPUT = clampWithDefault(
|
||||
readEnvInt("PI_BASH_MAX_OUTPUT_CHARS"),
|
||||
200_000,
|
||||
1_000,
|
||||
200_000,
|
||||
);
|
||||
export const DEFAULT_PENDING_MAX_OUTPUT = clampWithDefault(
|
||||
readEnvInt("OPENCLAW_BASH_PENDING_MAX_OUTPUT_CHARS"),
|
||||
200_000,
|
||||
1_000,
|
||||
200_000,
|
||||
);
|
||||
export const DEFAULT_PATH =
|
||||
process.env.PATH ?? "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin";
|
||||
export const DEFAULT_NOTIFY_TAIL_CHARS = 400;
|
||||
export const DEFAULT_APPROVAL_TIMEOUT_MS = 120_000;
|
||||
export const DEFAULT_APPROVAL_REQUEST_TIMEOUT_MS = 130_000;
|
||||
const DEFAULT_APPROVAL_RUNNING_NOTICE_MS = 10_000;
|
||||
const APPROVAL_SLUG_LENGTH = 8;
|
||||
|
||||
export const execSchema = Type.Object({
|
||||
command: Type.String({ description: "Shell command to execute" }),
|
||||
workdir: Type.Optional(Type.String({ description: "Working directory (defaults to cwd)" })),
|
||||
env: Type.Optional(Type.Record(Type.String(), Type.String())),
|
||||
yieldMs: Type.Optional(
|
||||
Type.Number({
|
||||
description: "Milliseconds to wait before backgrounding (default 10000)",
|
||||
}),
|
||||
),
|
||||
background: Type.Optional(Type.Boolean({ description: "Run in background immediately" })),
|
||||
timeout: Type.Optional(
|
||||
Type.Number({
|
||||
description: "Timeout in seconds (optional, kills process on expiry)",
|
||||
}),
|
||||
),
|
||||
pty: Type.Optional(
|
||||
Type.Boolean({
|
||||
description:
|
||||
"Run in a pseudo-terminal (PTY) when available (TTY-required CLIs, coding agents)",
|
||||
}),
|
||||
),
|
||||
elevated: Type.Optional(
|
||||
Type.Boolean({
|
||||
description: "Run on the host with elevated permissions (if allowed)",
|
||||
}),
|
||||
),
|
||||
host: Type.Optional(
|
||||
Type.String({
|
||||
description: "Exec host (sandbox|gateway|node).",
|
||||
}),
|
||||
),
|
||||
security: Type.Optional(
|
||||
Type.String({
|
||||
description: "Exec security mode (deny|allowlist|full).",
|
||||
}),
|
||||
),
|
||||
ask: Type.Optional(
|
||||
Type.String({
|
||||
description: "Exec ask mode (off|on-miss|always).",
|
||||
}),
|
||||
),
|
||||
node: Type.Optional(
|
||||
Type.String({
|
||||
description: "Node id/name for host=node.",
|
||||
}),
|
||||
),
|
||||
});
|
||||
|
||||
type PtyExitEvent = { exitCode: number; signal?: number };
|
||||
type PtyListener<T> = (event: T) => void;
|
||||
type PtyHandle = {
|
||||
pid: number;
|
||||
write: (data: string | Buffer) => void;
|
||||
onData: (listener: PtyListener<string>) => void;
|
||||
onExit: (listener: PtyListener<PtyExitEvent>) => void;
|
||||
};
|
||||
type PtySpawn = (
|
||||
file: string,
|
||||
args: string[] | string,
|
||||
options: {
|
||||
name?: string;
|
||||
cols?: number;
|
||||
rows?: number;
|
||||
cwd?: string;
|
||||
env?: Record<string, string>;
|
||||
},
|
||||
) => PtyHandle;
|
||||
|
||||
export type ExecProcessOutcome = {
|
||||
status: "completed" | "failed";
|
||||
exitCode: number | null;
|
||||
exitSignal: NodeJS.Signals | number | null;
|
||||
durationMs: number;
|
||||
aggregated: string;
|
||||
timedOut: boolean;
|
||||
reason?: string;
|
||||
};
|
||||
|
||||
export type ExecProcessHandle = {
|
||||
session: ProcessSession;
|
||||
startedAt: number;
|
||||
pid?: number;
|
||||
promise: Promise<ExecProcessOutcome>;
|
||||
kill: () => void;
|
||||
};
|
||||
|
||||
export function normalizeExecHost(value?: string | null): ExecHost | null {
|
||||
const normalized = value?.trim().toLowerCase();
|
||||
if (normalized === "sandbox" || normalized === "gateway" || normalized === "node") {
|
||||
return normalized;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
export function normalizeExecSecurity(value?: string | null): ExecSecurity | null {
|
||||
const normalized = value?.trim().toLowerCase();
|
||||
if (normalized === "deny" || normalized === "allowlist" || normalized === "full") {
|
||||
return normalized;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
export function normalizeExecAsk(value?: string | null): ExecAsk | null {
|
||||
const normalized = value?.trim().toLowerCase();
|
||||
if (normalized === "off" || normalized === "on-miss" || normalized === "always") {
|
||||
return normalized as ExecAsk;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
export function renderExecHostLabel(host: ExecHost) {
|
||||
return host === "sandbox" ? "sandbox" : host === "gateway" ? "gateway" : "node";
|
||||
}
|
||||
|
||||
export function normalizeNotifyOutput(value: string) {
|
||||
return value.replace(/\s+/g, " ").trim();
|
||||
}
|
||||
|
||||
export function normalizePathPrepend(entries?: string[]) {
|
||||
if (!Array.isArray(entries)) {
|
||||
return [];
|
||||
}
|
||||
const seen = new Set<string>();
|
||||
const normalized: string[] = [];
|
||||
for (const entry of entries) {
|
||||
if (typeof entry !== "string") {
|
||||
continue;
|
||||
}
|
||||
const trimmed = entry.trim();
|
||||
if (!trimmed || seen.has(trimmed)) {
|
||||
continue;
|
||||
}
|
||||
seen.add(trimmed);
|
||||
normalized.push(trimmed);
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
function mergePathPrepend(existing: string | undefined, prepend: string[]) {
|
||||
if (prepend.length === 0) {
|
||||
return existing;
|
||||
}
|
||||
const partsExisting = (existing ?? "")
|
||||
.split(path.delimiter)
|
||||
.map((part) => part.trim())
|
||||
.filter(Boolean);
|
||||
const merged: string[] = [];
|
||||
const seen = new Set<string>();
|
||||
for (const part of [...prepend, ...partsExisting]) {
|
||||
if (seen.has(part)) {
|
||||
continue;
|
||||
}
|
||||
seen.add(part);
|
||||
merged.push(part);
|
||||
}
|
||||
return merged.join(path.delimiter);
|
||||
}
|
||||
|
||||
export function applyPathPrepend(
|
||||
env: Record<string, string>,
|
||||
prepend: string[],
|
||||
options?: { requireExisting?: boolean },
|
||||
) {
|
||||
if (prepend.length === 0) {
|
||||
return;
|
||||
}
|
||||
if (options?.requireExisting && !env.PATH) {
|
||||
return;
|
||||
}
|
||||
const merged = mergePathPrepend(env.PATH, prepend);
|
||||
if (merged) {
|
||||
env.PATH = merged;
|
||||
}
|
||||
}
|
||||
|
||||
export function applyShellPath(env: Record<string, string>, shellPath?: string | null) {
|
||||
if (!shellPath) {
|
||||
return;
|
||||
}
|
||||
const entries = shellPath
|
||||
.split(path.delimiter)
|
||||
.map((part) => part.trim())
|
||||
.filter(Boolean);
|
||||
if (entries.length === 0) {
|
||||
return;
|
||||
}
|
||||
const merged = mergePathPrepend(env.PATH, entries);
|
||||
if (merged) {
|
||||
env.PATH = merged;
|
||||
}
|
||||
}
|
||||
|
||||
function maybeNotifyOnExit(session: ProcessSession, status: "completed" | "failed") {
|
||||
if (!session.backgrounded || !session.notifyOnExit || session.exitNotified) {
|
||||
return;
|
||||
}
|
||||
const sessionKey = session.sessionKey?.trim();
|
||||
if (!sessionKey) {
|
||||
return;
|
||||
}
|
||||
session.exitNotified = true;
|
||||
const exitLabel = session.exitSignal
|
||||
? `signal ${session.exitSignal}`
|
||||
: `code ${session.exitCode ?? 0}`;
|
||||
const output = normalizeNotifyOutput(
|
||||
tail(session.tail || session.aggregated || "", DEFAULT_NOTIFY_TAIL_CHARS),
|
||||
);
|
||||
const summary = output
|
||||
? `Exec ${status} (${session.id.slice(0, 8)}, ${exitLabel}) :: ${output}`
|
||||
: `Exec ${status} (${session.id.slice(0, 8)}, ${exitLabel})`;
|
||||
enqueueSystemEvent(summary, { sessionKey });
|
||||
requestHeartbeatNow({ reason: `exec:${session.id}:exit` });
|
||||
}
|
||||
|
||||
export function createApprovalSlug(id: string) {
|
||||
return id.slice(0, APPROVAL_SLUG_LENGTH);
|
||||
}
|
||||
|
||||
export function resolveApprovalRunningNoticeMs(value?: number) {
|
||||
if (typeof value !== "number" || !Number.isFinite(value)) {
|
||||
return DEFAULT_APPROVAL_RUNNING_NOTICE_MS;
|
||||
}
|
||||
if (value <= 0) {
|
||||
return 0;
|
||||
}
|
||||
return Math.floor(value);
|
||||
}
|
||||
|
||||
export function emitExecSystemEvent(
|
||||
text: string,
|
||||
opts: { sessionKey?: string; contextKey?: string },
|
||||
) {
|
||||
const sessionKey = opts.sessionKey?.trim();
|
||||
if (!sessionKey) {
|
||||
return;
|
||||
}
|
||||
enqueueSystemEvent(text, { sessionKey, contextKey: opts.contextKey });
|
||||
requestHeartbeatNow({ reason: "exec-event" });
|
||||
}
|
||||
|
||||
export async function runExecProcess(opts: {
|
||||
command: string;
|
||||
workdir: string;
|
||||
env: Record<string, string>;
|
||||
sandbox?: BashSandboxConfig;
|
||||
containerWorkdir?: string | null;
|
||||
usePty: boolean;
|
||||
warnings: string[];
|
||||
maxOutput: number;
|
||||
pendingMaxOutput: number;
|
||||
notifyOnExit: boolean;
|
||||
scopeKey?: string;
|
||||
sessionKey?: string;
|
||||
timeoutSec: number;
|
||||
onUpdate?: (partialResult: AgentToolResult<ExecToolDetails>) => void;
|
||||
}): Promise<ExecProcessHandle> {
|
||||
const startedAt = Date.now();
|
||||
const sessionId = createSessionSlug();
|
||||
let child: ChildProcessWithoutNullStreams | null = null;
|
||||
let pty: PtyHandle | null = null;
|
||||
let stdin: SessionStdin | undefined;
|
||||
|
||||
if (opts.sandbox) {
|
||||
const { child: spawned } = await spawnWithFallback({
|
||||
argv: [
|
||||
"docker",
|
||||
...buildDockerExecArgs({
|
||||
containerName: opts.sandbox.containerName,
|
||||
command: opts.command,
|
||||
workdir: opts.containerWorkdir ?? opts.sandbox.containerWorkdir,
|
||||
env: opts.env,
|
||||
tty: opts.usePty,
|
||||
}),
|
||||
],
|
||||
options: {
|
||||
cwd: opts.workdir,
|
||||
env: process.env,
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
windowsHide: true,
|
||||
},
|
||||
fallbacks: [
|
||||
{
|
||||
label: "no-detach",
|
||||
options: { detached: false },
|
||||
},
|
||||
],
|
||||
onFallback: (err, fallback) => {
|
||||
const errText = formatSpawnError(err);
|
||||
const warning = `Warning: spawn failed (${errText}); retrying with ${fallback.label}.`;
|
||||
logWarn(`exec: spawn failed (${errText}); retrying with ${fallback.label}.`);
|
||||
opts.warnings.push(warning);
|
||||
},
|
||||
});
|
||||
child = spawned as ChildProcessWithoutNullStreams;
|
||||
stdin = child.stdin;
|
||||
} else if (opts.usePty) {
|
||||
const { shell, args: shellArgs } = getShellConfig();
|
||||
try {
|
||||
const ptyModule = (await import("@lydell/node-pty")) as unknown as {
|
||||
spawn?: PtySpawn;
|
||||
default?: { spawn?: PtySpawn };
|
||||
};
|
||||
const spawnPty = ptyModule.spawn ?? ptyModule.default?.spawn;
|
||||
if (!spawnPty) {
|
||||
throw new Error("PTY support is unavailable (node-pty spawn not found).");
|
||||
}
|
||||
pty = spawnPty(shell, [...shellArgs, opts.command], {
|
||||
cwd: opts.workdir,
|
||||
env: opts.env,
|
||||
name: process.env.TERM ?? "xterm-256color",
|
||||
cols: 120,
|
||||
rows: 30,
|
||||
});
|
||||
stdin = {
|
||||
destroyed: false,
|
||||
write: (data, cb) => {
|
||||
try {
|
||||
pty?.write(data);
|
||||
cb?.(null);
|
||||
} catch (err) {
|
||||
cb?.(err as Error);
|
||||
}
|
||||
},
|
||||
end: () => {
|
||||
try {
|
||||
const eof = process.platform === "win32" ? "\x1a" : "\x04";
|
||||
pty?.write(eof);
|
||||
} catch {
|
||||
// ignore EOF errors
|
||||
}
|
||||
},
|
||||
};
|
||||
} catch (err) {
|
||||
const errText = String(err);
|
||||
const warning = `Warning: PTY spawn failed (${errText}); retrying without PTY for \`${opts.command}\`.`;
|
||||
logWarn(`exec: PTY spawn failed (${errText}); retrying without PTY for "${opts.command}".`);
|
||||
opts.warnings.push(warning);
|
||||
const { child: spawned } = await spawnWithFallback({
|
||||
argv: [shell, ...shellArgs, opts.command],
|
||||
options: {
|
||||
cwd: opts.workdir,
|
||||
env: opts.env,
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
windowsHide: true,
|
||||
},
|
||||
fallbacks: [
|
||||
{
|
||||
label: "no-detach",
|
||||
options: { detached: false },
|
||||
},
|
||||
],
|
||||
onFallback: (fallbackErr, fallback) => {
|
||||
const fallbackText = formatSpawnError(fallbackErr);
|
||||
const fallbackWarning = `Warning: spawn failed (${fallbackText}); retrying with ${fallback.label}.`;
|
||||
logWarn(`exec: spawn failed (${fallbackText}); retrying with ${fallback.label}.`);
|
||||
opts.warnings.push(fallbackWarning);
|
||||
},
|
||||
});
|
||||
child = spawned as ChildProcessWithoutNullStreams;
|
||||
stdin = child.stdin;
|
||||
}
|
||||
} else {
|
||||
const { shell, args: shellArgs } = getShellConfig();
|
||||
const { child: spawned } = await spawnWithFallback({
|
||||
argv: [shell, ...shellArgs, opts.command],
|
||||
options: {
|
||||
cwd: opts.workdir,
|
||||
env: opts.env,
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
windowsHide: true,
|
||||
},
|
||||
fallbacks: [
|
||||
{
|
||||
label: "no-detach",
|
||||
options: { detached: false },
|
||||
},
|
||||
],
|
||||
onFallback: (err, fallback) => {
|
||||
const errText = formatSpawnError(err);
|
||||
const warning = `Warning: spawn failed (${errText}); retrying with ${fallback.label}.`;
|
||||
logWarn(`exec: spawn failed (${errText}); retrying with ${fallback.label}.`);
|
||||
opts.warnings.push(warning);
|
||||
},
|
||||
});
|
||||
child = spawned as ChildProcessWithoutNullStreams;
|
||||
stdin = child.stdin;
|
||||
}
|
||||
|
||||
const session = {
|
||||
id: sessionId,
|
||||
command: opts.command,
|
||||
scopeKey: opts.scopeKey,
|
||||
sessionKey: opts.sessionKey,
|
||||
notifyOnExit: opts.notifyOnExit,
|
||||
exitNotified: false,
|
||||
child: child ?? undefined,
|
||||
stdin,
|
||||
pid: child?.pid ?? pty?.pid,
|
||||
startedAt,
|
||||
cwd: opts.workdir,
|
||||
maxOutputChars: opts.maxOutput,
|
||||
pendingMaxOutputChars: opts.pendingMaxOutput,
|
||||
totalOutputChars: 0,
|
||||
pendingStdout: [],
|
||||
pendingStderr: [],
|
||||
pendingStdoutChars: 0,
|
||||
pendingStderrChars: 0,
|
||||
aggregated: "",
|
||||
tail: "",
|
||||
exited: false,
|
||||
exitCode: undefined as number | null | undefined,
|
||||
exitSignal: undefined as NodeJS.Signals | number | null | undefined,
|
||||
truncated: false,
|
||||
backgrounded: false,
|
||||
} satisfies ProcessSession;
|
||||
addSession(session);
|
||||
|
||||
let settled = false;
|
||||
let timeoutTimer: NodeJS.Timeout | null = null;
|
||||
let timeoutFinalizeTimer: NodeJS.Timeout | null = null;
|
||||
let timedOut = false;
|
||||
const timeoutFinalizeMs = 1000;
|
||||
let resolveFn: ((outcome: ExecProcessOutcome) => void) | null = null;
|
||||
|
||||
const settle = (outcome: ExecProcessOutcome) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
resolveFn?.(outcome);
|
||||
};
|
||||
|
||||
const finalizeTimeout = () => {
|
||||
if (session.exited) {
|
||||
return;
|
||||
}
|
||||
markExited(session, null, "SIGKILL", "failed");
|
||||
maybeNotifyOnExit(session, "failed");
|
||||
const aggregated = session.aggregated.trim();
|
||||
const reason = `Command timed out after ${opts.timeoutSec} seconds`;
|
||||
settle({
|
||||
status: "failed",
|
||||
exitCode: null,
|
||||
exitSignal: "SIGKILL",
|
||||
durationMs: Date.now() - startedAt,
|
||||
aggregated,
|
||||
timedOut: true,
|
||||
reason: aggregated ? `${aggregated}\n\n${reason}` : reason,
|
||||
});
|
||||
};
|
||||
|
||||
const onTimeout = () => {
|
||||
timedOut = true;
|
||||
killSession(session);
|
||||
if (!timeoutFinalizeTimer) {
|
||||
timeoutFinalizeTimer = setTimeout(() => {
|
||||
finalizeTimeout();
|
||||
}, timeoutFinalizeMs);
|
||||
}
|
||||
};
|
||||
|
||||
if (opts.timeoutSec > 0) {
|
||||
timeoutTimer = setTimeout(() => {
|
||||
onTimeout();
|
||||
}, opts.timeoutSec * 1000);
|
||||
}
|
||||
|
||||
const emitUpdate = () => {
|
||||
if (!opts.onUpdate) {
|
||||
return;
|
||||
}
|
||||
const tailText = session.tail || session.aggregated;
|
||||
const warningText = opts.warnings.length ? `${opts.warnings.join("\n")}\n\n` : "";
|
||||
opts.onUpdate({
|
||||
content: [{ type: "text", text: warningText + (tailText || "") }],
|
||||
details: {
|
||||
status: "running",
|
||||
sessionId,
|
||||
pid: session.pid ?? undefined,
|
||||
startedAt,
|
||||
cwd: session.cwd,
|
||||
tail: session.tail,
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
const handleStdout = (data: string) => {
|
||||
const str = sanitizeBinaryOutput(data.toString());
|
||||
for (const chunk of chunkString(str)) {
|
||||
appendOutput(session, "stdout", chunk);
|
||||
emitUpdate();
|
||||
}
|
||||
};
|
||||
|
||||
const handleStderr = (data: string) => {
|
||||
const str = sanitizeBinaryOutput(data.toString());
|
||||
for (const chunk of chunkString(str)) {
|
||||
appendOutput(session, "stderr", chunk);
|
||||
emitUpdate();
|
||||
}
|
||||
};
|
||||
|
||||
if (pty) {
|
||||
const cursorResponse = buildCursorPositionResponse();
|
||||
pty.onData((data) => {
|
||||
const raw = data.toString();
|
||||
const { cleaned, requests } = stripDsrRequests(raw);
|
||||
if (requests > 0) {
|
||||
for (let i = 0; i < requests; i += 1) {
|
||||
pty.write(cursorResponse);
|
||||
}
|
||||
}
|
||||
handleStdout(cleaned);
|
||||
});
|
||||
} else if (child) {
|
||||
child.stdout.on("data", handleStdout);
|
||||
child.stderr.on("data", handleStderr);
|
||||
}
|
||||
|
||||
const promise = new Promise<ExecProcessOutcome>((resolve) => {
|
||||
resolveFn = resolve;
|
||||
const handleExit = (code: number | null, exitSignal: NodeJS.Signals | number | null) => {
|
||||
if (timeoutTimer) {
|
||||
clearTimeout(timeoutTimer);
|
||||
}
|
||||
if (timeoutFinalizeTimer) {
|
||||
clearTimeout(timeoutFinalizeTimer);
|
||||
}
|
||||
const durationMs = Date.now() - startedAt;
|
||||
const wasSignal = exitSignal != null;
|
||||
const isSuccess = code === 0 && !wasSignal && !timedOut;
|
||||
const status: "completed" | "failed" = isSuccess ? "completed" : "failed";
|
||||
markExited(session, code, exitSignal, status);
|
||||
maybeNotifyOnExit(session, status);
|
||||
if (!session.child && session.stdin) {
|
||||
session.stdin.destroyed = true;
|
||||
}
|
||||
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
const aggregated = session.aggregated.trim();
|
||||
if (!isSuccess) {
|
||||
const reason = timedOut
|
||||
? `Command timed out after ${opts.timeoutSec} seconds`
|
||||
: wasSignal && exitSignal
|
||||
? `Command aborted by signal ${exitSignal}`
|
||||
: code === null
|
||||
? "Command aborted before exit code was captured"
|
||||
: `Command exited with code ${code}`;
|
||||
const message = aggregated ? `${aggregated}\n\n${reason}` : reason;
|
||||
settle({
|
||||
status: "failed",
|
||||
exitCode: code ?? null,
|
||||
exitSignal: exitSignal ?? null,
|
||||
durationMs,
|
||||
aggregated,
|
||||
timedOut,
|
||||
reason: message,
|
||||
});
|
||||
return;
|
||||
}
|
||||
settle({
|
||||
status: "completed",
|
||||
exitCode: code ?? 0,
|
||||
exitSignal: exitSignal ?? null,
|
||||
durationMs,
|
||||
aggregated,
|
||||
timedOut: false,
|
||||
});
|
||||
};
|
||||
|
||||
if (pty) {
|
||||
pty.onExit((event) => {
|
||||
const rawSignal = event.signal ?? null;
|
||||
const normalizedSignal = rawSignal === 0 ? null : rawSignal;
|
||||
handleExit(event.exitCode ?? null, normalizedSignal);
|
||||
});
|
||||
} else if (child) {
|
||||
child.once("close", (code, exitSignal) => {
|
||||
handleExit(code, exitSignal);
|
||||
});
|
||||
|
||||
child.once("error", (err) => {
|
||||
if (timeoutTimer) {
|
||||
clearTimeout(timeoutTimer);
|
||||
}
|
||||
if (timeoutFinalizeTimer) {
|
||||
clearTimeout(timeoutFinalizeTimer);
|
||||
}
|
||||
markExited(session, null, null, "failed");
|
||||
maybeNotifyOnExit(session, "failed");
|
||||
const aggregated = session.aggregated.trim();
|
||||
const message = aggregated ? `${aggregated}\n\n${String(err)}` : String(err);
|
||||
settle({
|
||||
status: "failed",
|
||||
exitCode: null,
|
||||
exitSignal: null,
|
||||
durationMs: Date.now() - startedAt,
|
||||
aggregated,
|
||||
timedOut,
|
||||
reason: message,
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
session,
|
||||
startedAt,
|
||||
pid: session.pid ?? undefined,
|
||||
promise,
|
||||
kill: () => killSession(session),
|
||||
};
|
||||
}
|
||||
@@ -1,8 +1,5 @@
|
||||
import type { AgentTool, AgentToolResult } from "@mariozechner/pi-agent-core";
|
||||
import type { ChildProcessWithoutNullStreams } from "node:child_process";
|
||||
import { Type } from "@sinclair/typebox";
|
||||
import crypto from "node:crypto";
|
||||
import path from "node:path";
|
||||
import type { BashSandboxConfig } from "./bash-tools.shared.js";
|
||||
import {
|
||||
type ExecAsk,
|
||||
@@ -19,163 +16,49 @@ import {
|
||||
resolveExecApprovals,
|
||||
resolveExecApprovalsFromFile,
|
||||
} from "../infra/exec-approvals.js";
|
||||
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
||||
import { buildNodeShellCommand } from "../infra/node-shell.js";
|
||||
import {
|
||||
getShellPathFromLoginShell,
|
||||
resolveShellEnvFallbackTimeoutMs,
|
||||
} from "../infra/shell-env.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import { logInfo, logWarn } from "../logger.js";
|
||||
import { formatSpawnError, spawnWithFallback } from "../process/spawn-utils.js";
|
||||
import { logInfo } from "../logger.js";
|
||||
import { parseAgentSessionKey, resolveAgentIdFromSessionKey } from "../routing/session-key.js";
|
||||
import { markBackgrounded, tail } from "./bash-process-registry.js";
|
||||
import {
|
||||
type ProcessSession,
|
||||
type SessionStdin,
|
||||
addSession,
|
||||
appendOutput,
|
||||
createSessionSlug,
|
||||
markBackgrounded,
|
||||
markExited,
|
||||
tail,
|
||||
} from "./bash-process-registry.js";
|
||||
DEFAULT_APPROVAL_REQUEST_TIMEOUT_MS,
|
||||
DEFAULT_APPROVAL_TIMEOUT_MS,
|
||||
DEFAULT_MAX_OUTPUT,
|
||||
DEFAULT_NOTIFY_TAIL_CHARS,
|
||||
DEFAULT_PATH,
|
||||
DEFAULT_PENDING_MAX_OUTPUT,
|
||||
applyPathPrepend,
|
||||
applyShellPath,
|
||||
createApprovalSlug,
|
||||
emitExecSystemEvent,
|
||||
normalizeExecAsk,
|
||||
normalizeExecHost,
|
||||
normalizeExecSecurity,
|
||||
normalizeNotifyOutput,
|
||||
normalizePathPrepend,
|
||||
renderExecHostLabel,
|
||||
resolveApprovalRunningNoticeMs,
|
||||
runExecProcess,
|
||||
execSchema,
|
||||
type ExecProcessHandle,
|
||||
validateHostEnv,
|
||||
} from "./bash-tools.exec-runtime.js";
|
||||
import {
|
||||
buildDockerExecArgs,
|
||||
buildSandboxEnv,
|
||||
chunkString,
|
||||
clampWithDefault,
|
||||
coerceEnv,
|
||||
killSession,
|
||||
readEnvInt,
|
||||
resolveSandboxWorkdir,
|
||||
resolveWorkdir,
|
||||
truncateMiddle,
|
||||
} from "./bash-tools.shared.js";
|
||||
import { buildCursorPositionResponse, stripDsrRequests } from "./pty-dsr.js";
|
||||
import { getShellConfig, sanitizeBinaryOutput } from "./shell-utils.js";
|
||||
import { callGatewayTool } from "./tools/gateway.js";
|
||||
import { listNodes, resolveNodeIdFromList } from "./tools/nodes-utils.js";
|
||||
|
||||
// Security: Blocklist of environment variables that could alter execution flow
|
||||
// or inject code when running on non-sandboxed hosts (Gateway/Node).
|
||||
const DANGEROUS_HOST_ENV_VARS = new Set([
|
||||
"LD_PRELOAD",
|
||||
"LD_LIBRARY_PATH",
|
||||
"LD_AUDIT",
|
||||
"DYLD_INSERT_LIBRARIES",
|
||||
"DYLD_LIBRARY_PATH",
|
||||
"NODE_OPTIONS",
|
||||
"NODE_PATH",
|
||||
"PYTHONPATH",
|
||||
"PYTHONHOME",
|
||||
"RUBYLIB",
|
||||
"PERL5LIB",
|
||||
"BASH_ENV",
|
||||
"ENV",
|
||||
"GCONV_PATH",
|
||||
"IFS",
|
||||
"SSLKEYLOGFILE",
|
||||
]);
|
||||
const DANGEROUS_HOST_ENV_PREFIXES = ["DYLD_", "LD_"];
|
||||
|
||||
// Centralized sanitization helper.
|
||||
// Throws an error if dangerous variables or PATH modifications are detected on the host.
|
||||
function validateHostEnv(env: Record<string, string>): void {
|
||||
for (const key of Object.keys(env)) {
|
||||
const upperKey = key.toUpperCase();
|
||||
|
||||
// 1. Block known dangerous variables (Fail Closed)
|
||||
if (DANGEROUS_HOST_ENV_PREFIXES.some((prefix) => upperKey.startsWith(prefix))) {
|
||||
throw new Error(
|
||||
`Security Violation: Environment variable '${key}' is forbidden during host execution.`,
|
||||
);
|
||||
}
|
||||
if (DANGEROUS_HOST_ENV_VARS.has(upperKey)) {
|
||||
throw new Error(
|
||||
`Security Violation: Environment variable '${key}' is forbidden during host execution.`,
|
||||
);
|
||||
}
|
||||
|
||||
// 2. Strictly block PATH modification on host
|
||||
// Allowing custom PATH on the gateway/node can lead to binary hijacking.
|
||||
if (upperKey === "PATH") {
|
||||
throw new Error(
|
||||
"Security Violation: Custom 'PATH' variable is forbidden during host execution.",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
const DEFAULT_MAX_OUTPUT = clampWithDefault(
|
||||
readEnvInt("PI_BASH_MAX_OUTPUT_CHARS"),
|
||||
200_000,
|
||||
1_000,
|
||||
200_000,
|
||||
);
|
||||
const DEFAULT_PENDING_MAX_OUTPUT = clampWithDefault(
|
||||
readEnvInt("OPENCLAW_BASH_PENDING_MAX_OUTPUT_CHARS"),
|
||||
200_000,
|
||||
1_000,
|
||||
200_000,
|
||||
);
|
||||
const DEFAULT_PATH =
|
||||
process.env.PATH ?? "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin";
|
||||
const DEFAULT_NOTIFY_TAIL_CHARS = 400;
|
||||
const DEFAULT_APPROVAL_TIMEOUT_MS = 120_000;
|
||||
const DEFAULT_APPROVAL_REQUEST_TIMEOUT_MS = 130_000;
|
||||
const DEFAULT_APPROVAL_RUNNING_NOTICE_MS = 10_000;
|
||||
const APPROVAL_SLUG_LENGTH = 8;
|
||||
|
||||
type PtyExitEvent = { exitCode: number; signal?: number };
|
||||
type PtyListener<T> = (event: T) => void;
|
||||
type PtyHandle = {
|
||||
pid: number;
|
||||
write: (data: string | Buffer) => void;
|
||||
onData: (listener: PtyListener<string>) => void;
|
||||
onExit: (listener: PtyListener<PtyExitEvent>) => void;
|
||||
};
|
||||
type PtySpawn = (
|
||||
file: string,
|
||||
args: string[] | string,
|
||||
options: {
|
||||
name?: string;
|
||||
cols?: number;
|
||||
rows?: number;
|
||||
cwd?: string;
|
||||
env?: Record<string, string>;
|
||||
},
|
||||
) => PtyHandle;
|
||||
type PtyModule = {
|
||||
spawn?: PtySpawn;
|
||||
default?: { spawn?: PtySpawn };
|
||||
};
|
||||
type PtyModuleLoader = () => Promise<PtyModule>;
|
||||
|
||||
const loadPtyModuleDefault: PtyModuleLoader = async () =>
|
||||
(await import("@lydell/node-pty")) as unknown as PtyModule;
|
||||
let loadPtyModule: PtyModuleLoader = loadPtyModuleDefault;
|
||||
|
||||
export function setPtyModuleLoaderForTests(loader?: PtyModuleLoader): void {
|
||||
loadPtyModule = loader ?? loadPtyModuleDefault;
|
||||
}
|
||||
|
||||
type ExecProcessOutcome = {
|
||||
status: "completed" | "failed";
|
||||
exitCode: number | null;
|
||||
exitSignal: NodeJS.Signals | number | null;
|
||||
durationMs: number;
|
||||
aggregated: string;
|
||||
timedOut: boolean;
|
||||
reason?: string;
|
||||
};
|
||||
|
||||
type ExecProcessHandle = {
|
||||
session: ProcessSession;
|
||||
startedAt: number;
|
||||
pid?: number;
|
||||
promise: Promise<ExecProcessOutcome>;
|
||||
kill: () => void;
|
||||
};
|
||||
|
||||
export type ExecToolDefaults = {
|
||||
host?: ExecHost;
|
||||
security?: ExecSecurity;
|
||||
@@ -205,54 +88,6 @@ export type ExecElevatedDefaults = {
|
||||
defaultLevel: "on" | "off" | "ask" | "full";
|
||||
};
|
||||
|
||||
const execSchema = Type.Object({
|
||||
command: Type.String({ description: "Shell command to execute" }),
|
||||
workdir: Type.Optional(Type.String({ description: "Working directory (defaults to cwd)" })),
|
||||
env: Type.Optional(Type.Record(Type.String(), Type.String())),
|
||||
yieldMs: Type.Optional(
|
||||
Type.Number({
|
||||
description: "Milliseconds to wait before backgrounding (default 10000)",
|
||||
}),
|
||||
),
|
||||
background: Type.Optional(Type.Boolean({ description: "Run in background immediately" })),
|
||||
timeout: Type.Optional(
|
||||
Type.Number({
|
||||
description: "Timeout in seconds (optional, kills process on expiry)",
|
||||
}),
|
||||
),
|
||||
pty: Type.Optional(
|
||||
Type.Boolean({
|
||||
description:
|
||||
"Run in a pseudo-terminal (PTY) when available (TTY-required CLIs, coding agents)",
|
||||
}),
|
||||
),
|
||||
elevated: Type.Optional(
|
||||
Type.Boolean({
|
||||
description: "Run on the host with elevated permissions (if allowed)",
|
||||
}),
|
||||
),
|
||||
host: Type.Optional(
|
||||
Type.String({
|
||||
description: "Exec host (sandbox|gateway|node).",
|
||||
}),
|
||||
),
|
||||
security: Type.Optional(
|
||||
Type.String({
|
||||
description: "Exec security mode (deny|allowlist|full).",
|
||||
}),
|
||||
),
|
||||
ask: Type.Optional(
|
||||
Type.String({
|
||||
description: "Exec ask mode (off|on-miss|always).",
|
||||
}),
|
||||
),
|
||||
node: Type.Optional(
|
||||
Type.String({
|
||||
description: "Node id/name for host=node.",
|
||||
}),
|
||||
),
|
||||
});
|
||||
|
||||
export type ExecToolDetails =
|
||||
| {
|
||||
status: "running";
|
||||
@@ -280,533 +115,6 @@ export type ExecToolDetails =
|
||||
nodeId?: string;
|
||||
};
|
||||
|
||||
function normalizeExecHost(value?: string | null): ExecHost | null {
|
||||
const normalized = value?.trim().toLowerCase();
|
||||
if (normalized === "sandbox" || normalized === "gateway" || normalized === "node") {
|
||||
return normalized;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function normalizeExecSecurity(value?: string | null): ExecSecurity | null {
|
||||
const normalized = value?.trim().toLowerCase();
|
||||
if (normalized === "deny" || normalized === "allowlist" || normalized === "full") {
|
||||
return normalized;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function normalizeExecAsk(value?: string | null): ExecAsk | null {
|
||||
const normalized = value?.trim().toLowerCase();
|
||||
if (normalized === "off" || normalized === "on-miss" || normalized === "always") {
|
||||
return normalized as ExecAsk;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function renderExecHostLabel(host: ExecHost) {
|
||||
return host === "sandbox" ? "sandbox" : host === "gateway" ? "gateway" : "node";
|
||||
}
|
||||
|
||||
function normalizeNotifyOutput(value: string) {
|
||||
return value.replace(/\s+/g, " ").trim();
|
||||
}
|
||||
|
||||
function normalizePathPrepend(entries?: string[]) {
|
||||
if (!Array.isArray(entries)) {
|
||||
return [];
|
||||
}
|
||||
const seen = new Set<string>();
|
||||
const normalized: string[] = [];
|
||||
for (const entry of entries) {
|
||||
if (typeof entry !== "string") {
|
||||
continue;
|
||||
}
|
||||
const trimmed = entry.trim();
|
||||
if (!trimmed || seen.has(trimmed)) {
|
||||
continue;
|
||||
}
|
||||
seen.add(trimmed);
|
||||
normalized.push(trimmed);
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
function mergePathPrepend(existing: string | undefined, prepend: string[]) {
|
||||
if (prepend.length === 0) {
|
||||
return existing;
|
||||
}
|
||||
const partsExisting = (existing ?? "")
|
||||
.split(path.delimiter)
|
||||
.map((part) => part.trim())
|
||||
.filter(Boolean);
|
||||
const merged: string[] = [];
|
||||
const seen = new Set<string>();
|
||||
for (const part of [...prepend, ...partsExisting]) {
|
||||
if (seen.has(part)) {
|
||||
continue;
|
||||
}
|
||||
seen.add(part);
|
||||
merged.push(part);
|
||||
}
|
||||
return merged.join(path.delimiter);
|
||||
}
|
||||
|
||||
function applyPathPrepend(
|
||||
env: Record<string, string>,
|
||||
prepend: string[],
|
||||
options?: { requireExisting?: boolean },
|
||||
) {
|
||||
if (prepend.length === 0) {
|
||||
return;
|
||||
}
|
||||
if (options?.requireExisting && !env.PATH) {
|
||||
return;
|
||||
}
|
||||
const merged = mergePathPrepend(env.PATH, prepend);
|
||||
if (merged) {
|
||||
env.PATH = merged;
|
||||
}
|
||||
}
|
||||
|
||||
function applyShellPath(env: Record<string, string>, shellPath?: string | null) {
|
||||
if (!shellPath) {
|
||||
return;
|
||||
}
|
||||
const entries = shellPath
|
||||
.split(path.delimiter)
|
||||
.map((part) => part.trim())
|
||||
.filter(Boolean);
|
||||
if (entries.length === 0) {
|
||||
return;
|
||||
}
|
||||
const merged = mergePathPrepend(env.PATH, entries);
|
||||
if (merged) {
|
||||
env.PATH = merged;
|
||||
}
|
||||
}
|
||||
|
||||
function maybeNotifyOnExit(session: ProcessSession, status: "completed" | "failed") {
|
||||
if (!session.backgrounded || !session.notifyOnExit || session.exitNotified) {
|
||||
return;
|
||||
}
|
||||
const sessionKey = session.sessionKey?.trim();
|
||||
if (!sessionKey) {
|
||||
return;
|
||||
}
|
||||
session.exitNotified = true;
|
||||
const exitLabel = session.exitSignal
|
||||
? `signal ${session.exitSignal}`
|
||||
: `code ${session.exitCode ?? 0}`;
|
||||
const output = normalizeNotifyOutput(
|
||||
tail(session.tail || session.aggregated || "", DEFAULT_NOTIFY_TAIL_CHARS),
|
||||
);
|
||||
const summary = output
|
||||
? `Exec ${status} (${session.id.slice(0, 8)}, ${exitLabel}) :: ${output}`
|
||||
: `Exec ${status} (${session.id.slice(0, 8)}, ${exitLabel})`;
|
||||
enqueueSystemEvent(summary, { sessionKey });
|
||||
requestHeartbeatNow({ reason: `exec:${session.id}:exit` });
|
||||
}
|
||||
|
||||
function createApprovalSlug(id: string) {
|
||||
return id.slice(0, APPROVAL_SLUG_LENGTH);
|
||||
}
|
||||
|
||||
function resolveApprovalRunningNoticeMs(value?: number) {
|
||||
if (typeof value !== "number" || !Number.isFinite(value)) {
|
||||
return DEFAULT_APPROVAL_RUNNING_NOTICE_MS;
|
||||
}
|
||||
if (value <= 0) {
|
||||
return 0;
|
||||
}
|
||||
return Math.floor(value);
|
||||
}
|
||||
|
||||
function emitExecSystemEvent(text: string, opts: { sessionKey?: string; contextKey?: string }) {
|
||||
const sessionKey = opts.sessionKey?.trim();
|
||||
if (!sessionKey) {
|
||||
return;
|
||||
}
|
||||
enqueueSystemEvent(text, { sessionKey, contextKey: opts.contextKey });
|
||||
requestHeartbeatNow({ reason: "exec-event" });
|
||||
}
|
||||
|
||||
async function runExecProcess(opts: {
|
||||
command: string;
|
||||
workdir: string;
|
||||
env: Record<string, string>;
|
||||
sandbox?: BashSandboxConfig;
|
||||
containerWorkdir?: string | null;
|
||||
usePty: boolean;
|
||||
warnings: string[];
|
||||
maxOutput: number;
|
||||
pendingMaxOutput: number;
|
||||
notifyOnExit: boolean;
|
||||
scopeKey?: string;
|
||||
sessionKey?: string;
|
||||
timeoutSec: number;
|
||||
onUpdate?: (partialResult: AgentToolResult<ExecToolDetails>) => void;
|
||||
}): Promise<ExecProcessHandle> {
|
||||
const startedAt = Date.now();
|
||||
const sessionId = createSessionSlug();
|
||||
let child: ChildProcessWithoutNullStreams | null = null;
|
||||
let pty: PtyHandle | null = null;
|
||||
let stdin: SessionStdin | undefined;
|
||||
|
||||
if (opts.sandbox) {
|
||||
const { child: spawned } = await spawnWithFallback({
|
||||
argv: [
|
||||
"docker",
|
||||
...buildDockerExecArgs({
|
||||
containerName: opts.sandbox.containerName,
|
||||
command: opts.command,
|
||||
workdir: opts.containerWorkdir ?? opts.sandbox.containerWorkdir,
|
||||
env: opts.env,
|
||||
tty: opts.usePty,
|
||||
}),
|
||||
],
|
||||
options: {
|
||||
cwd: opts.workdir,
|
||||
env: process.env,
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
windowsHide: true,
|
||||
},
|
||||
fallbacks: [
|
||||
{
|
||||
label: "no-detach",
|
||||
options: { detached: false },
|
||||
},
|
||||
],
|
||||
onFallback: (err, fallback) => {
|
||||
const errText = formatSpawnError(err);
|
||||
const warning = `Warning: spawn failed (${errText}); retrying with ${fallback.label}.`;
|
||||
logWarn(`exec: spawn failed (${errText}); retrying with ${fallback.label}.`);
|
||||
opts.warnings.push(warning);
|
||||
},
|
||||
});
|
||||
child = spawned as ChildProcessWithoutNullStreams;
|
||||
stdin = child.stdin;
|
||||
} else if (opts.usePty) {
|
||||
const { shell, args: shellArgs } = getShellConfig();
|
||||
try {
|
||||
const ptyModule = await loadPtyModule();
|
||||
const spawnPty = ptyModule.spawn ?? ptyModule.default?.spawn;
|
||||
if (!spawnPty) {
|
||||
throw new Error("PTY support is unavailable (node-pty spawn not found).");
|
||||
}
|
||||
pty = spawnPty(shell, [...shellArgs, opts.command], {
|
||||
cwd: opts.workdir,
|
||||
env: opts.env,
|
||||
name: process.env.TERM ?? "xterm-256color",
|
||||
cols: 120,
|
||||
rows: 30,
|
||||
});
|
||||
stdin = {
|
||||
destroyed: false,
|
||||
write: (data, cb) => {
|
||||
try {
|
||||
pty?.write(data);
|
||||
cb?.(null);
|
||||
} catch (err) {
|
||||
cb?.(err as Error);
|
||||
}
|
||||
},
|
||||
end: () => {
|
||||
try {
|
||||
const eof = process.platform === "win32" ? "\x1a" : "\x04";
|
||||
pty?.write(eof);
|
||||
} catch {
|
||||
// ignore EOF errors
|
||||
}
|
||||
},
|
||||
};
|
||||
} catch (err) {
|
||||
const errText = String(err);
|
||||
const warning = `Warning: PTY spawn failed (${errText}); retrying without PTY for \`${opts.command}\`.`;
|
||||
logWarn(`exec: PTY spawn failed (${errText}); retrying without PTY for "${opts.command}".`);
|
||||
opts.warnings.push(warning);
|
||||
const { child: spawned } = await spawnWithFallback({
|
||||
argv: [shell, ...shellArgs, opts.command],
|
||||
options: {
|
||||
cwd: opts.workdir,
|
||||
env: opts.env,
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
windowsHide: true,
|
||||
},
|
||||
fallbacks: [
|
||||
{
|
||||
label: "no-detach",
|
||||
options: { detached: false },
|
||||
},
|
||||
],
|
||||
onFallback: (fallbackErr, fallback) => {
|
||||
const fallbackText = formatSpawnError(fallbackErr);
|
||||
const fallbackWarning = `Warning: spawn failed (${fallbackText}); retrying with ${fallback.label}.`;
|
||||
logWarn(`exec: spawn failed (${fallbackText}); retrying with ${fallback.label}.`);
|
||||
opts.warnings.push(fallbackWarning);
|
||||
},
|
||||
});
|
||||
child = spawned as ChildProcessWithoutNullStreams;
|
||||
stdin = child.stdin;
|
||||
}
|
||||
} else {
|
||||
const { shell, args: shellArgs } = getShellConfig();
|
||||
const { child: spawned } = await spawnWithFallback({
|
||||
argv: [shell, ...shellArgs, opts.command],
|
||||
options: {
|
||||
cwd: opts.workdir,
|
||||
env: opts.env,
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
windowsHide: true,
|
||||
},
|
||||
fallbacks: [
|
||||
{
|
||||
label: "no-detach",
|
||||
options: { detached: false },
|
||||
},
|
||||
],
|
||||
onFallback: (err, fallback) => {
|
||||
const errText = formatSpawnError(err);
|
||||
const warning = `Warning: spawn failed (${errText}); retrying with ${fallback.label}.`;
|
||||
logWarn(`exec: spawn failed (${errText}); retrying with ${fallback.label}.`);
|
||||
opts.warnings.push(warning);
|
||||
},
|
||||
});
|
||||
child = spawned as ChildProcessWithoutNullStreams;
|
||||
stdin = child.stdin;
|
||||
}
|
||||
|
||||
const session = {
|
||||
id: sessionId,
|
||||
command: opts.command,
|
||||
scopeKey: opts.scopeKey,
|
||||
sessionKey: opts.sessionKey,
|
||||
notifyOnExit: opts.notifyOnExit,
|
||||
exitNotified: false,
|
||||
child: child ?? undefined,
|
||||
stdin,
|
||||
pid: child?.pid ?? pty?.pid,
|
||||
startedAt,
|
||||
cwd: opts.workdir,
|
||||
maxOutputChars: opts.maxOutput,
|
||||
pendingMaxOutputChars: opts.pendingMaxOutput,
|
||||
totalOutputChars: 0,
|
||||
pendingStdout: [],
|
||||
pendingStderr: [],
|
||||
pendingStdoutChars: 0,
|
||||
pendingStderrChars: 0,
|
||||
aggregated: "",
|
||||
tail: "",
|
||||
exited: false,
|
||||
exitCode: undefined as number | null | undefined,
|
||||
exitSignal: undefined as NodeJS.Signals | number | null | undefined,
|
||||
truncated: false,
|
||||
backgrounded: false,
|
||||
} satisfies ProcessSession;
|
||||
addSession(session);
|
||||
|
||||
let settled = false;
|
||||
let timeoutTimer: NodeJS.Timeout | null = null;
|
||||
let timeoutFinalizeTimer: NodeJS.Timeout | null = null;
|
||||
let timedOut = false;
|
||||
const timeoutFinalizeMs = 1000;
|
||||
let resolveFn: ((outcome: ExecProcessOutcome) => void) | null = null;
|
||||
|
||||
const settle = (outcome: ExecProcessOutcome) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
resolveFn?.(outcome);
|
||||
};
|
||||
|
||||
const finalizeTimeout = () => {
|
||||
if (session.exited) {
|
||||
return;
|
||||
}
|
||||
markExited(session, null, "SIGKILL", "failed");
|
||||
maybeNotifyOnExit(session, "failed");
|
||||
const aggregated = session.aggregated.trim();
|
||||
const reason = `Command timed out after ${opts.timeoutSec} seconds`;
|
||||
settle({
|
||||
status: "failed",
|
||||
exitCode: null,
|
||||
exitSignal: "SIGKILL",
|
||||
durationMs: Date.now() - startedAt,
|
||||
aggregated,
|
||||
timedOut: true,
|
||||
reason: aggregated ? `${aggregated}\n\n${reason}` : reason,
|
||||
});
|
||||
};
|
||||
|
||||
const onTimeout = () => {
|
||||
timedOut = true;
|
||||
killSession(session);
|
||||
if (!timeoutFinalizeTimer) {
|
||||
timeoutFinalizeTimer = setTimeout(() => {
|
||||
finalizeTimeout();
|
||||
}, timeoutFinalizeMs);
|
||||
}
|
||||
};
|
||||
|
||||
if (opts.timeoutSec > 0) {
|
||||
timeoutTimer = setTimeout(() => {
|
||||
onTimeout();
|
||||
}, opts.timeoutSec * 1000);
|
||||
}
|
||||
|
||||
const emitUpdate = () => {
|
||||
if (!opts.onUpdate) {
|
||||
return;
|
||||
}
|
||||
const tailText = session.tail || session.aggregated;
|
||||
const warningText = opts.warnings.length ? `${opts.warnings.join("\n")}\n\n` : "";
|
||||
opts.onUpdate({
|
||||
content: [{ type: "text", text: warningText + (tailText || "") }],
|
||||
details: {
|
||||
status: "running",
|
||||
sessionId,
|
||||
pid: session.pid ?? undefined,
|
||||
startedAt,
|
||||
cwd: session.cwd,
|
||||
tail: session.tail,
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
const handleStdout = (data: string) => {
|
||||
const str = sanitizeBinaryOutput(data.toString());
|
||||
for (const chunk of chunkString(str)) {
|
||||
appendOutput(session, "stdout", chunk);
|
||||
emitUpdate();
|
||||
}
|
||||
};
|
||||
|
||||
const handleStderr = (data: string) => {
|
||||
const str = sanitizeBinaryOutput(data.toString());
|
||||
for (const chunk of chunkString(str)) {
|
||||
appendOutput(session, "stderr", chunk);
|
||||
emitUpdate();
|
||||
}
|
||||
};
|
||||
|
||||
if (pty) {
|
||||
const cursorResponse = buildCursorPositionResponse();
|
||||
pty.onData((data) => {
|
||||
const raw = data.toString();
|
||||
const { cleaned, requests } = stripDsrRequests(raw);
|
||||
if (requests > 0) {
|
||||
for (let i = 0; i < requests; i += 1) {
|
||||
pty.write(cursorResponse);
|
||||
}
|
||||
}
|
||||
handleStdout(cleaned);
|
||||
});
|
||||
} else if (child) {
|
||||
child.stdout.on("data", handleStdout);
|
||||
child.stderr.on("data", handleStderr);
|
||||
}
|
||||
|
||||
const promise = new Promise<ExecProcessOutcome>((resolve) => {
|
||||
resolveFn = resolve;
|
||||
const handleExit = (code: number | null, exitSignal: NodeJS.Signals | number | null) => {
|
||||
if (timeoutTimer) {
|
||||
clearTimeout(timeoutTimer);
|
||||
}
|
||||
if (timeoutFinalizeTimer) {
|
||||
clearTimeout(timeoutFinalizeTimer);
|
||||
}
|
||||
const durationMs = Date.now() - startedAt;
|
||||
const wasSignal = exitSignal != null;
|
||||
const isSuccess = code === 0 && !wasSignal && !timedOut;
|
||||
const status: "completed" | "failed" = isSuccess ? "completed" : "failed";
|
||||
markExited(session, code, exitSignal, status);
|
||||
maybeNotifyOnExit(session, status);
|
||||
if (!session.child && session.stdin) {
|
||||
session.stdin.destroyed = true;
|
||||
}
|
||||
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
const aggregated = session.aggregated.trim();
|
||||
if (!isSuccess) {
|
||||
const reason = timedOut
|
||||
? `Command timed out after ${opts.timeoutSec} seconds`
|
||||
: wasSignal && exitSignal
|
||||
? `Command aborted by signal ${exitSignal}`
|
||||
: code === null
|
||||
? "Command aborted before exit code was captured"
|
||||
: `Command exited with code ${code}`;
|
||||
const message = aggregated ? `${aggregated}\n\n${reason}` : reason;
|
||||
settle({
|
||||
status: "failed",
|
||||
exitCode: code ?? null,
|
||||
exitSignal: exitSignal ?? null,
|
||||
durationMs,
|
||||
aggregated,
|
||||
timedOut,
|
||||
reason: message,
|
||||
});
|
||||
return;
|
||||
}
|
||||
settle({
|
||||
status: "completed",
|
||||
exitCode: code ?? 0,
|
||||
exitSignal: exitSignal ?? null,
|
||||
durationMs,
|
||||
aggregated,
|
||||
timedOut: false,
|
||||
});
|
||||
};
|
||||
|
||||
if (pty) {
|
||||
pty.onExit((event) => {
|
||||
const rawSignal = event.signal ?? null;
|
||||
const normalizedSignal = rawSignal === 0 ? null : rawSignal;
|
||||
handleExit(event.exitCode ?? null, normalizedSignal);
|
||||
});
|
||||
} else if (child) {
|
||||
child.once("close", (code, exitSignal) => {
|
||||
handleExit(code, exitSignal);
|
||||
});
|
||||
|
||||
child.once("error", (err) => {
|
||||
if (timeoutTimer) {
|
||||
clearTimeout(timeoutTimer);
|
||||
}
|
||||
if (timeoutFinalizeTimer) {
|
||||
clearTimeout(timeoutFinalizeTimer);
|
||||
}
|
||||
markExited(session, null, null, "failed");
|
||||
maybeNotifyOnExit(session, "failed");
|
||||
const aggregated = session.aggregated.trim();
|
||||
const message = aggregated ? `${aggregated}\n\n${String(err)}` : String(err);
|
||||
settle({
|
||||
status: "failed",
|
||||
exitCode: null,
|
||||
exitSignal: null,
|
||||
durationMs: Date.now() - startedAt,
|
||||
aggregated,
|
||||
timedOut,
|
||||
reason: message,
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
session,
|
||||
startedAt,
|
||||
pid: session.pid ?? undefined,
|
||||
promise,
|
||||
kill: () => killSession(session),
|
||||
};
|
||||
}
|
||||
|
||||
export function createExecTool(
|
||||
defaults?: ExecToolDefaults,
|
||||
// oxlint-disable-next-line typescript/no-explicit-any
|
||||
@@ -1135,51 +443,29 @@ export function createExecTool(
|
||||
if (requiresAsk) {
|
||||
const approvalId = crypto.randomUUID();
|
||||
const approvalSlug = createApprovalSlug(approvalId);
|
||||
const expiresAtMs = Date.now() + DEFAULT_APPROVAL_TIMEOUT_MS;
|
||||
const contextKey = `exec:${approvalId}`;
|
||||
const noticeSeconds = Math.max(1, Math.round(approvalRunningNoticeMs / 1000));
|
||||
const warningText = warnings.length ? `${warnings.join("\n")}\n\n` : "";
|
||||
|
||||
// Register the approval with expectFinal:false to get immediate confirmation.
|
||||
// This ensures the approval ID is valid before we return.
|
||||
let expiresAtMs = Date.now() + DEFAULT_APPROVAL_TIMEOUT_MS;
|
||||
try {
|
||||
const registrationResult = await callGatewayTool<{
|
||||
status?: string;
|
||||
expiresAtMs?: number;
|
||||
}>(
|
||||
"exec.approval.request",
|
||||
{ timeoutMs: 10_000 },
|
||||
{
|
||||
id: approvalId,
|
||||
command: commandText,
|
||||
cwd: workdir,
|
||||
host: "node",
|
||||
security: hostSecurity,
|
||||
ask: hostAsk,
|
||||
agentId,
|
||||
resolvedPath: undefined,
|
||||
sessionKey: defaults?.sessionKey,
|
||||
timeoutMs: DEFAULT_APPROVAL_TIMEOUT_MS,
|
||||
twoPhase: true,
|
||||
},
|
||||
{ expectFinal: false },
|
||||
);
|
||||
if (registrationResult?.expiresAtMs) {
|
||||
expiresAtMs = registrationResult.expiresAtMs;
|
||||
}
|
||||
} catch (err) {
|
||||
// Registration failed - throw to caller
|
||||
throw new Error(`Exec approval registration failed: ${String(err)}`, { cause: err });
|
||||
}
|
||||
|
||||
// Fire-and-forget: wait for decision via waitDecision endpoint, then execute.
|
||||
void (async () => {
|
||||
let decision: string | null = null;
|
||||
try {
|
||||
const decisionResult = await callGatewayTool<{ decision?: string }>(
|
||||
"exec.approval.waitDecision",
|
||||
const decisionResult = await callGatewayTool<{ decision: string }>(
|
||||
"exec.approval.request",
|
||||
{ timeoutMs: DEFAULT_APPROVAL_REQUEST_TIMEOUT_MS },
|
||||
{ id: approvalId },
|
||||
{
|
||||
id: approvalId,
|
||||
command: commandText,
|
||||
cwd: workdir,
|
||||
host: "node",
|
||||
security: hostSecurity,
|
||||
ask: hostAsk,
|
||||
agentId,
|
||||
resolvedPath: undefined,
|
||||
sessionKey: defaults?.sessionKey,
|
||||
timeoutMs: DEFAULT_APPROVAL_TIMEOUT_MS,
|
||||
},
|
||||
);
|
||||
const decisionValue =
|
||||
decisionResult && typeof decisionResult === "object"
|
||||
@@ -1337,6 +623,7 @@ export function createExecTool(
|
||||
if (requiresAsk) {
|
||||
const approvalId = crypto.randomUUID();
|
||||
const approvalSlug = createApprovalSlug(approvalId);
|
||||
const expiresAtMs = Date.now() + DEFAULT_APPROVAL_TIMEOUT_MS;
|
||||
const contextKey = `exec:${approvalId}`;
|
||||
const resolvedPath = allowlistEval.segments[0]?.resolution?.resolvedPath;
|
||||
const noticeSeconds = Math.max(1, Math.round(approvalRunningNoticeMs / 1000));
|
||||
@@ -1345,47 +632,24 @@ export function createExecTool(
|
||||
typeof params.timeout === "number" ? params.timeout : defaultTimeoutSec;
|
||||
const warningText = warnings.length ? `${warnings.join("\n")}\n\n` : "";
|
||||
|
||||
// Register the approval with expectFinal:false to get immediate confirmation.
|
||||
// This ensures the approval ID is valid before we return.
|
||||
let expiresAtMs = Date.now() + DEFAULT_APPROVAL_TIMEOUT_MS;
|
||||
try {
|
||||
const registrationResult = await callGatewayTool<{
|
||||
status?: string;
|
||||
expiresAtMs?: number;
|
||||
}>(
|
||||
"exec.approval.request",
|
||||
{ timeoutMs: 10_000 },
|
||||
{
|
||||
id: approvalId,
|
||||
command: commandText,
|
||||
cwd: workdir,
|
||||
host: "gateway",
|
||||
security: hostSecurity,
|
||||
ask: hostAsk,
|
||||
agentId,
|
||||
resolvedPath,
|
||||
sessionKey: defaults?.sessionKey,
|
||||
timeoutMs: DEFAULT_APPROVAL_TIMEOUT_MS,
|
||||
twoPhase: true,
|
||||
},
|
||||
{ expectFinal: false },
|
||||
);
|
||||
if (registrationResult?.expiresAtMs) {
|
||||
expiresAtMs = registrationResult.expiresAtMs;
|
||||
}
|
||||
} catch (err) {
|
||||
// Registration failed - throw to caller
|
||||
throw new Error(`Exec approval registration failed: ${String(err)}`, { cause: err });
|
||||
}
|
||||
|
||||
// Fire-and-forget: wait for decision via waitDecision endpoint, then execute.
|
||||
void (async () => {
|
||||
let decision: string | null = null;
|
||||
try {
|
||||
const decisionResult = await callGatewayTool<{ decision?: string }>(
|
||||
"exec.approval.waitDecision",
|
||||
const decisionResult = await callGatewayTool<{ decision: string }>(
|
||||
"exec.approval.request",
|
||||
{ timeoutMs: DEFAULT_APPROVAL_REQUEST_TIMEOUT_MS },
|
||||
{ id: approvalId },
|
||||
{
|
||||
id: approvalId,
|
||||
command: commandText,
|
||||
cwd: workdir,
|
||||
host: "gateway",
|
||||
security: hostSecurity,
|
||||
ask: hostAsk,
|
||||
agentId,
|
||||
resolvedPath,
|
||||
sessionKey: defaults?.sessionKey,
|
||||
timeoutMs: DEFAULT_APPROVAL_TIMEOUT_MS,
|
||||
},
|
||||
);
|
||||
const decisionValue =
|
||||
decisionResult && typeof decisionResult === "object"
|
||||
|
||||
Reference in New Issue
Block a user