Files
Moltbot/src/agents/bash-tools.process.ts
Adam d07d24eebe fix: clamp poll sleep duration to non-negative in bash-tools process (#24889)
`Math.min(250, deadline - Date.now())` could return a negative value if
the deadline expired between the while-condition check and the setTimeout
call. Wrap with `Math.max(0, ...)` to ensure the sleep is never negative.

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 03:22:58 +00:00

656 lines
22 KiB
TypeScript

import type { AgentTool, AgentToolResult } from "@mariozechner/pi-agent-core";
import { Type } from "@sinclair/typebox";
import { formatDurationCompact } from "../infra/format-time/format-duration.ts";
import { getDiagnosticSessionState } from "../logging/diagnostic-session-state.js";
import { killProcessTree } from "../process/kill-tree.js";
import { getProcessSupervisor } from "../process/supervisor/index.js";
import {
type ProcessSession,
deleteSession,
drainSession,
getFinishedSession,
getSession,
listFinishedSessions,
listRunningSessions,
markExited,
setJobTtlMs,
} from "./bash-process-registry.js";
import { deriveSessionName, pad, sliceLogLines, truncateMiddle } from "./bash-tools.shared.js";
import { recordCommandPoll, resetCommandPollCount } from "./command-poll-backoff.js";
import { encodeKeySequence, encodePaste } from "./pty-keys.js";
export type ProcessToolDefaults = {
cleanupMs?: number;
scopeKey?: string;
};
type WritableStdin = {
write: (data: string, cb?: (err?: Error | null) => void) => void;
end: () => void;
destroyed?: boolean;
};
const DEFAULT_LOG_TAIL_LINES = 200;
function resolveLogSliceWindow(offset?: number, limit?: number) {
const usingDefaultTail = offset === undefined && limit === undefined;
const effectiveLimit =
typeof limit === "number" && Number.isFinite(limit)
? limit
: usingDefaultTail
? DEFAULT_LOG_TAIL_LINES
: undefined;
return { effectiveOffset: offset, effectiveLimit, usingDefaultTail };
}
function defaultTailNote(totalLines: number, usingDefaultTail: boolean) {
if (!usingDefaultTail || totalLines <= DEFAULT_LOG_TAIL_LINES) {
return "";
}
return `\n\n[showing last ${DEFAULT_LOG_TAIL_LINES} of ${totalLines} lines; pass offset/limit to page]`;
}
const processSchema = Type.Object({
action: Type.String({ description: "Process action" }),
sessionId: Type.Optional(Type.String({ description: "Session id for actions other than list" })),
data: Type.Optional(Type.String({ description: "Data to write for write" })),
keys: Type.Optional(
Type.Array(Type.String(), { description: "Key tokens to send for send-keys" }),
),
hex: Type.Optional(Type.Array(Type.String(), { description: "Hex bytes to send for send-keys" })),
literal: Type.Optional(Type.String({ description: "Literal string for send-keys" })),
text: Type.Optional(Type.String({ description: "Text to paste for paste" })),
bracketed: Type.Optional(Type.Boolean({ description: "Wrap paste in bracketed mode" })),
eof: Type.Optional(Type.Boolean({ description: "Close stdin after write" })),
offset: Type.Optional(Type.Number({ description: "Log offset" })),
limit: Type.Optional(Type.Number({ description: "Log length" })),
timeout: Type.Optional(
Type.Number({
description: "For poll: wait up to this many milliseconds before returning",
minimum: 0,
}),
),
});
const MAX_POLL_WAIT_MS = 120_000;
function resolvePollWaitMs(value: unknown) {
if (typeof value === "number" && Number.isFinite(value)) {
return Math.max(0, Math.min(MAX_POLL_WAIT_MS, Math.floor(value)));
}
if (typeof value === "string") {
const parsed = Number.parseInt(value.trim(), 10);
if (Number.isFinite(parsed)) {
return Math.max(0, Math.min(MAX_POLL_WAIT_MS, parsed));
}
}
return 0;
}
function failText(text: string): AgentToolResult<unknown> {
return {
content: [
{
type: "text",
text,
},
],
details: { status: "failed" },
};
}
function recordPollRetrySuggestion(sessionId: string, hasNewOutput: boolean): number | undefined {
try {
const sessionState = getDiagnosticSessionState({ sessionId });
return recordCommandPoll(sessionState, sessionId, hasNewOutput);
} catch {
return undefined;
}
}
function resetPollRetrySuggestion(sessionId: string): void {
try {
const sessionState = getDiagnosticSessionState({ sessionId });
resetCommandPollCount(sessionState, sessionId);
} catch {
// Ignore diagnostics state failures for process tool behavior.
}
}
export function createProcessTool(
defaults?: ProcessToolDefaults,
// oxlint-disable-next-line typescript/no-explicit-any
): AgentTool<any, unknown> {
if (defaults?.cleanupMs !== undefined) {
setJobTtlMs(defaults.cleanupMs);
}
const scopeKey = defaults?.scopeKey;
const supervisor = getProcessSupervisor();
const isInScope = (session?: { scopeKey?: string } | null) =>
!scopeKey || session?.scopeKey === scopeKey;
const cancelManagedSession = (sessionId: string) => {
const record = supervisor.getRecord(sessionId);
if (!record || record.state === "exited") {
return false;
}
supervisor.cancel(sessionId, "manual-cancel");
return true;
};
const terminateSessionFallback = (session: ProcessSession) => {
const pid = session.pid ?? session.child?.pid;
if (typeof pid !== "number" || !Number.isFinite(pid) || pid <= 0) {
return false;
}
killProcessTree(pid);
return true;
};
return {
name: "process",
label: "process",
description:
"Manage running exec sessions: list, poll, log, write, send-keys, submit, paste, kill.",
parameters: processSchema,
execute: async (_toolCallId, args, _signal, _onUpdate): Promise<AgentToolResult<unknown>> => {
const params = args as {
action:
| "list"
| "poll"
| "log"
| "write"
| "send-keys"
| "submit"
| "paste"
| "kill"
| "clear"
| "remove";
sessionId?: string;
data?: string;
keys?: string[];
hex?: string[];
literal?: string;
text?: string;
bracketed?: boolean;
eof?: boolean;
offset?: number;
limit?: number;
timeout?: unknown;
};
if (params.action === "list") {
const running = listRunningSessions()
.filter((s) => isInScope(s))
.map((s) => ({
sessionId: s.id,
status: "running",
pid: s.pid ?? undefined,
startedAt: s.startedAt,
runtimeMs: Date.now() - s.startedAt,
cwd: s.cwd,
command: s.command,
name: deriveSessionName(s.command),
tail: s.tail,
truncated: s.truncated,
}));
const finished = listFinishedSessions()
.filter((s) => isInScope(s))
.map((s) => ({
sessionId: s.id,
status: s.status,
startedAt: s.startedAt,
endedAt: s.endedAt,
runtimeMs: s.endedAt - s.startedAt,
cwd: s.cwd,
command: s.command,
name: deriveSessionName(s.command),
tail: s.tail,
truncated: s.truncated,
exitCode: s.exitCode ?? undefined,
exitSignal: s.exitSignal ?? undefined,
}));
const lines = [...running, ...finished]
.toSorted((a, b) => b.startedAt - a.startedAt)
.map((s) => {
const label = s.name ? truncateMiddle(s.name, 80) : truncateMiddle(s.command, 120);
return `${s.sessionId} ${pad(s.status, 9)} ${formatDurationCompact(s.runtimeMs) ?? "n/a"} :: ${label}`;
});
return {
content: [
{
type: "text",
text: lines.join("\n") || "No running or recent sessions.",
},
],
details: { status: "completed", sessions: [...running, ...finished] },
};
}
if (!params.sessionId) {
return {
content: [{ type: "text", text: "sessionId is required for this action." }],
details: { status: "failed" },
};
}
const session = getSession(params.sessionId);
const finished = getFinishedSession(params.sessionId);
const scopedSession = isInScope(session) ? session : undefined;
const scopedFinished = isInScope(finished) ? finished : undefined;
const failedResult = (text: string): AgentToolResult<unknown> => ({
content: [{ type: "text", text }],
details: { status: "failed" },
});
const resolveBackgroundedWritableStdin = () => {
if (!scopedSession) {
return {
ok: false as const,
result: failedResult(`No active session found for ${params.sessionId}`),
};
}
if (!scopedSession.backgrounded) {
return {
ok: false as const,
result: failedResult(`Session ${params.sessionId} is not backgrounded.`),
};
}
const stdin = scopedSession.stdin ?? scopedSession.child?.stdin;
if (!stdin || stdin.destroyed) {
return {
ok: false as const,
result: failedResult(`Session ${params.sessionId} stdin is not writable.`),
};
}
return { ok: true as const, session: scopedSession, stdin: stdin as WritableStdin };
};
const writeToStdin = async (stdin: WritableStdin, data: string) => {
await new Promise<void>((resolve, reject) => {
stdin.write(data, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
};
const runningSessionResult = (
session: ProcessSession,
text: string,
): AgentToolResult<unknown> => ({
content: [{ type: "text", text }],
details: {
status: "running",
sessionId: params.sessionId,
name: deriveSessionName(session.command),
},
});
switch (params.action) {
case "poll": {
if (!scopedSession) {
if (scopedFinished) {
resetPollRetrySuggestion(params.sessionId);
return {
content: [
{
type: "text",
text:
(scopedFinished.tail ||
`(no output recorded${
scopedFinished.truncated ? " — truncated to cap" : ""
})`) +
`\n\nProcess exited with ${
scopedFinished.exitSignal
? `signal ${scopedFinished.exitSignal}`
: `code ${scopedFinished.exitCode ?? 0}`
}.`,
},
],
details: {
status: scopedFinished.status === "completed" ? "completed" : "failed",
sessionId: params.sessionId,
exitCode: scopedFinished.exitCode ?? undefined,
aggregated: scopedFinished.aggregated,
name: deriveSessionName(scopedFinished.command),
},
};
}
resetPollRetrySuggestion(params.sessionId);
return failText(`No session found for ${params.sessionId}`);
}
if (!scopedSession.backgrounded) {
return failText(`Session ${params.sessionId} is not backgrounded.`);
}
const pollWaitMs = resolvePollWaitMs(params.timeout);
if (pollWaitMs > 0 && !scopedSession.exited) {
const deadline = Date.now() + pollWaitMs;
while (!scopedSession.exited && Date.now() < deadline) {
await new Promise((resolve) =>
setTimeout(resolve, Math.max(0, Math.min(250, deadline - Date.now()))),
);
}
}
const { stdout, stderr } = drainSession(scopedSession);
const exited = scopedSession.exited;
const exitCode = scopedSession.exitCode ?? 0;
const exitSignal = scopedSession.exitSignal ?? undefined;
if (exited) {
const status = exitCode === 0 && exitSignal == null ? "completed" : "failed";
markExited(
scopedSession,
scopedSession.exitCode ?? null,
scopedSession.exitSignal ?? null,
status,
);
}
const status = exited
? exitCode === 0 && exitSignal == null
? "completed"
: "failed"
: "running";
const output = [stdout.trimEnd(), stderr.trimEnd()].filter(Boolean).join("\n").trim();
const hasNewOutput = output.length > 0;
const retryInMs = exited
? undefined
: recordPollRetrySuggestion(params.sessionId, hasNewOutput);
if (exited) {
resetPollRetrySuggestion(params.sessionId);
}
return {
content: [
{
type: "text",
text:
(output || "(no new output)") +
(exited
? `\n\nProcess exited with ${
exitSignal ? `signal ${exitSignal}` : `code ${exitCode}`
}.`
: "\n\nProcess still running."),
},
],
details: {
status,
sessionId: params.sessionId,
exitCode: exited ? exitCode : undefined,
aggregated: scopedSession.aggregated,
name: deriveSessionName(scopedSession.command),
...(typeof retryInMs === "number" ? { retryInMs } : {}),
},
};
}
case "log": {
if (scopedSession) {
if (!scopedSession.backgrounded) {
return {
content: [
{
type: "text",
text: `Session ${params.sessionId} is not backgrounded.`,
},
],
details: { status: "failed" },
};
}
const window = resolveLogSliceWindow(params.offset, params.limit);
const { slice, totalLines, totalChars } = sliceLogLines(
scopedSession.aggregated,
window.effectiveOffset,
window.effectiveLimit,
);
const logDefaultTailNote = defaultTailNote(totalLines, window.usingDefaultTail);
return {
content: [{ type: "text", text: (slice || "(no output yet)") + logDefaultTailNote }],
details: {
status: scopedSession.exited ? "completed" : "running",
sessionId: params.sessionId,
total: totalLines,
totalLines,
totalChars,
truncated: scopedSession.truncated,
name: deriveSessionName(scopedSession.command),
},
};
}
if (scopedFinished) {
const window = resolveLogSliceWindow(params.offset, params.limit);
const { slice, totalLines, totalChars } = sliceLogLines(
scopedFinished.aggregated,
window.effectiveOffset,
window.effectiveLimit,
);
const status = scopedFinished.status === "completed" ? "completed" : "failed";
const logDefaultTailNote = defaultTailNote(totalLines, window.usingDefaultTail);
return {
content: [
{ type: "text", text: (slice || "(no output recorded)") + logDefaultTailNote },
],
details: {
status,
sessionId: params.sessionId,
total: totalLines,
totalLines,
totalChars,
truncated: scopedFinished.truncated,
exitCode: scopedFinished.exitCode ?? undefined,
exitSignal: scopedFinished.exitSignal ?? undefined,
name: deriveSessionName(scopedFinished.command),
},
};
}
return {
content: [
{
type: "text",
text: `No session found for ${params.sessionId}`,
},
],
details: { status: "failed" },
};
}
case "write": {
const resolved = resolveBackgroundedWritableStdin();
if (!resolved.ok) {
return resolved.result;
}
await writeToStdin(resolved.stdin, params.data ?? "");
if (params.eof) {
resolved.stdin.end();
}
return runningSessionResult(
resolved.session,
`Wrote ${(params.data ?? "").length} bytes to session ${params.sessionId}${
params.eof ? " (stdin closed)" : ""
}.`,
);
}
case "send-keys": {
const resolved = resolveBackgroundedWritableStdin();
if (!resolved.ok) {
return resolved.result;
}
const { data, warnings } = encodeKeySequence({
keys: params.keys,
hex: params.hex,
literal: params.literal,
});
if (!data) {
return {
content: [
{
type: "text",
text: "No key data provided.",
},
],
details: { status: "failed" },
};
}
await writeToStdin(resolved.stdin, data);
return runningSessionResult(
resolved.session,
`Sent ${data.length} bytes to session ${params.sessionId}.` +
(warnings.length ? `\nWarnings:\n- ${warnings.join("\n- ")}` : ""),
);
}
case "submit": {
const resolved = resolveBackgroundedWritableStdin();
if (!resolved.ok) {
return resolved.result;
}
await writeToStdin(resolved.stdin, "\r");
return runningSessionResult(
resolved.session,
`Submitted session ${params.sessionId} (sent CR).`,
);
}
case "paste": {
const resolved = resolveBackgroundedWritableStdin();
if (!resolved.ok) {
return resolved.result;
}
const payload = encodePaste(params.text ?? "", params.bracketed !== false);
if (!payload) {
return {
content: [
{
type: "text",
text: "No paste text provided.",
},
],
details: { status: "failed" },
};
}
await writeToStdin(resolved.stdin, payload);
return runningSessionResult(
resolved.session,
`Pasted ${params.text?.length ?? 0} chars to session ${params.sessionId}.`,
);
}
case "kill": {
if (!scopedSession) {
return failText(`No active session found for ${params.sessionId}`);
}
if (!scopedSession.backgrounded) {
return failText(`Session ${params.sessionId} is not backgrounded.`);
}
const canceled = cancelManagedSession(scopedSession.id);
if (!canceled) {
const terminated = terminateSessionFallback(scopedSession);
if (!terminated) {
return failText(
`Unable to terminate session ${params.sessionId}: no active supervisor run or process id.`,
);
}
markExited(scopedSession, null, "SIGKILL", "failed");
}
resetPollRetrySuggestion(params.sessionId);
return {
content: [
{
type: "text",
text: canceled
? `Termination requested for session ${params.sessionId}.`
: `Killed session ${params.sessionId}.`,
},
],
details: {
status: "failed",
name: scopedSession ? deriveSessionName(scopedSession.command) : undefined,
},
};
}
case "clear": {
if (scopedFinished) {
resetPollRetrySuggestion(params.sessionId);
deleteSession(params.sessionId);
return {
content: [{ type: "text", text: `Cleared session ${params.sessionId}.` }],
details: { status: "completed" },
};
}
return {
content: [
{
type: "text",
text: `No finished session found for ${params.sessionId}`,
},
],
details: { status: "failed" },
};
}
case "remove": {
if (scopedSession) {
const canceled = cancelManagedSession(scopedSession.id);
if (canceled) {
// Keep remove semantics deterministic: drop from process registry now.
scopedSession.backgrounded = false;
deleteSession(params.sessionId);
} else {
const terminated = terminateSessionFallback(scopedSession);
if (!terminated) {
return failText(
`Unable to remove session ${params.sessionId}: no active supervisor run or process id.`,
);
}
markExited(scopedSession, null, "SIGKILL", "failed");
deleteSession(params.sessionId);
}
resetPollRetrySuggestion(params.sessionId);
return {
content: [
{
type: "text",
text: canceled
? `Removed session ${params.sessionId} (termination requested).`
: `Removed session ${params.sessionId}.`,
},
],
details: {
status: "failed",
name: scopedSession ? deriveSessionName(scopedSession.command) : undefined,
},
};
}
if (scopedFinished) {
resetPollRetrySuggestion(params.sessionId);
deleteSession(params.sessionId);
return {
content: [{ type: "text", text: `Removed session ${params.sessionId}.` }],
details: { status: "completed" },
};
}
return {
content: [
{
type: "text",
text: `No session found for ${params.sessionId}`,
},
],
details: { status: "failed" },
};
}
}
return {
content: [{ type: "text", text: `Unknown action ${params.action as string}` }],
details: { status: "failed" },
};
},
};
}
export const processTool = createProcessTool();