diff --git a/src/gateway/server-methods/chat-transcript-inject.ts b/src/gateway/server-methods/chat-transcript-inject.ts new file mode 100644 index 000000000..f8c6bfd39 --- /dev/null +++ b/src/gateway/server-methods/chat-transcript-inject.ts @@ -0,0 +1,75 @@ +import { SessionManager } from "@mariozechner/pi-coding-agent"; + +type AppendMessageArg = Parameters[0]; + +export type GatewayInjectedAbortMeta = { + aborted: true; + origin: "rpc" | "stop-command"; + runId: string; +}; + +export type GatewayInjectedTranscriptAppendResult = { + ok: boolean; + messageId?: string; + message?: Record; + error?: string; +}; + +export function appendInjectedAssistantMessageToTranscript(params: { + transcriptPath: string; + message: string; + label?: string; + idempotencyKey?: string; + abortMeta?: GatewayInjectedAbortMeta; + now?: number; +}): GatewayInjectedTranscriptAppendResult { + const now = params.now ?? Date.now(); + const labelPrefix = params.label ? `[${params.label}]\n\n` : ""; + const usage = { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + }, + }; + const messageBody: AppendMessageArg & Record = { + role: "assistant", + content: [{ type: "text", text: `${labelPrefix}${params.message}` }], + timestamp: now, + // Pi stopReason is a strict enum; this is not model output, but we still store it as a + // normal assistant message so it participates in the session parentId chain. + stopReason: "stop", + usage, + // Make these explicit so downstream tooling never treats this as model output. + api: "openai-responses", + provider: "openclaw", + model: "gateway-injected", + ...(params.idempotencyKey ? { idempotencyKey: params.idempotencyKey } : {}), + ...(params.abortMeta + ? { + openclawAbort: { + aborted: true, + origin: params.abortMeta.origin, + runId: params.abortMeta.runId, + }, + } + : {}), + }; + + try { + // IMPORTANT: Use SessionManager so the entry is attached to the current leaf via parentId. + // Raw jsonl appends break the parent chain and can hide compaction summaries from context. + const sessionManager = SessionManager.open(params.transcriptPath); + const messageId = sessionManager.appendMessage(messageBody); + return { ok: true, messageId, message: messageBody }; + } catch (err) { + return { ok: false, error: err instanceof Error ? err.message : String(err) }; + } +} diff --git a/src/gateway/server-methods/chat.inject.parentid.test.ts b/src/gateway/server-methods/chat.inject.parentid.test.ts index b25cbc3fb..12a4a38f1 100644 --- a/src/gateway/server-methods/chat.inject.parentid.test.ts +++ b/src/gateway/server-methods/chat.inject.parentid.test.ts @@ -1,62 +1,37 @@ import fs from "node:fs"; -import { describe, expect, it, vi } from "vitest"; -import { createMockSessionEntry, createTranscriptFixtureSync } from "./chat.test-helpers.js"; -import type { GatewayRequestContext } from "./types.js"; +import { describe, expect, it } from "vitest"; +import { appendInjectedAssistantMessageToTranscript } from "./chat-transcript-inject.js"; +import { createTranscriptFixtureSync } from "./chat.test-helpers.js"; // Guardrail: Ensure gateway "injected" assistant transcript messages are appended via SessionManager, // so they are attached to the current leaf with a `parentId` and do not sever compaction history. describe("gateway chat.inject transcript writes", () => { it("appends a Pi session entry that includes parentId", async () => { - const sessionId = "sess-1"; - const { transcriptPath } = createTranscriptFixtureSync({ + const { dir, transcriptPath } = createTranscriptFixtureSync({ prefix: "openclaw-chat-inject-", - sessionId, + sessionId: "sess-1", }); - vi.doMock("../session-utils.js", async (importOriginal) => { - const original = await importOriginal(); - return { - ...original, - loadSessionEntry: () => - createMockSessionEntry({ - transcriptPath, - sessionId, - canonicalKey: "k1", - }), - }; - }); + try { + const appended = appendInjectedAssistantMessageToTranscript({ + transcriptPath, + message: "hello", + }); + expect(appended.ok).toBe(true); + expect(appended.messageId).toBeTruthy(); - const { chatHandlers } = await import("./chat.js"); + const lines = fs.readFileSync(transcriptPath, "utf-8").split(/\r?\n/).filter(Boolean); + expect(lines.length).toBeGreaterThanOrEqual(2); - const respond = vi.fn(); - type InjectCtx = Pick; - const context: InjectCtx = { - broadcast: vi.fn() as unknown as InjectCtx["broadcast"], - nodeSendToSession: vi.fn() as unknown as InjectCtx["nodeSendToSession"], - }; - await chatHandlers["chat.inject"]({ - params: { sessionKey: "k1", message: "hello" }, - respond, - req: {} as never, - client: null as never, - isWebchatConnect: () => false, - context: context as unknown as GatewayRequestContext, - }); + const last = JSON.parse(lines.at(-1) as string) as Record; + expect(last.type).toBe("message"); - expect(respond).toHaveBeenCalled(); - const [, payload, error] = respond.mock.calls.at(-1) ?? []; - expect(error).toBeUndefined(); - expect(payload).toMatchObject({ ok: true }); - - const lines = fs.readFileSync(transcriptPath, "utf-8").split(/\r?\n/).filter(Boolean); - expect(lines.length).toBeGreaterThanOrEqual(2); - - const last = JSON.parse(lines.at(-1) as string) as Record; - expect(last.type).toBe("message"); - - // The regression we saw: raw jsonl appends omitted this field entirely. - expect(Object.prototype.hasOwnProperty.call(last, "parentId")).toBe(true); - expect(last).toHaveProperty("id"); - expect(last).toHaveProperty("message"); + // The regression we saw: raw jsonl appends omitted this field entirely. + expect(Object.prototype.hasOwnProperty.call(last, "parentId")).toBe(true); + expect(last).toHaveProperty("id"); + expect(last).toHaveProperty("message"); + } finally { + fs.rmSync(dir, { recursive: true, force: true }); + } }); }); diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index c26050655..e0d8de855 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -1,6 +1,6 @@ import fs from "node:fs"; import path from "node:path"; -import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent"; +import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent"; import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import { resolveThinkingDefault } from "../../agents/model-selection.js"; import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; @@ -45,6 +45,7 @@ import { import { formatForLog } from "../ws-log.js"; import { injectTimestamp, timestampOptsFromConfig } from "./agent-timestamp.js"; import { normalizeRpcAttachmentsToChatAttachments } from "./attachment-normalize.js"; +import { appendInjectedAssistantMessageToTranscript } from "./chat-transcript-inject.js"; import type { GatewayRequestContext, GatewayRequestHandlers } from "./types.js"; type TranscriptAppendResult = { @@ -54,7 +55,6 @@ type TranscriptAppendResult = { error?: string; }; -type AppendMessageArg = Parameters[0]; type AbortOrigin = "rpc" | "stop-command"; type AbortedPartialSnapshot = { @@ -376,55 +376,13 @@ function appendAssistantTranscriptMessage(params: { return { ok: true }; } - const now = Date.now(); - const labelPrefix = params.label ? `[${params.label}]\n\n` : ""; - const usage = { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - totalTokens: 0, - cost: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - total: 0, - }, - }; - const messageBody: AppendMessageArg & Record = { - role: "assistant", - content: [{ type: "text", text: `${labelPrefix}${params.message}` }], - timestamp: now, - // Pi stopReason is a strict enum; this is not model output, but we still store it as a - // normal assistant message so it participates in the session parentId chain. - stopReason: "stop", - usage, - // Make these explicit so downstream tooling never treats this as model output. - api: "openai-responses", - provider: "openclaw", - model: "gateway-injected", - ...(params.idempotencyKey ? { idempotencyKey: params.idempotencyKey } : {}), - ...(params.abortMeta - ? { - openclawAbort: { - aborted: true, - origin: params.abortMeta.origin, - runId: params.abortMeta.runId, - }, - } - : {}), - }; - - try { - // IMPORTANT: Use SessionManager so the entry is attached to the current leaf via parentId. - // Raw jsonl appends break the parent chain and can hide compaction summaries from context. - const sessionManager = SessionManager.open(transcriptPath); - const messageId = sessionManager.appendMessage(messageBody); - return { ok: true, messageId, message: messageBody }; - } catch (err) { - return { ok: false, error: err instanceof Error ? err.message : String(err) }; - } + return appendInjectedAssistantMessageToTranscript({ + transcriptPath, + message: params.message, + label: params.label, + idempotencyKey: params.idempotencyKey, + abortMeta: params.abortMeta, + }); } function collectSessionAbortPartials(params: { diff --git a/src/gateway/server-methods/server-methods.test.ts b/src/gateway/server-methods/server-methods.test.ts index 174f23f3d..084ca2af1 100644 --- a/src/gateway/server-methods/server-methods.test.ts +++ b/src/gateway/server-methods/server-methods.test.ts @@ -224,14 +224,18 @@ describe("sanitizeChatSendMessageInput", () => { }); describe("gateway chat transcript writes (guardrail)", () => { - it("does not append transcript messages via raw fs.appendFileSync(transcriptPath, ...)", () => { + it("routes transcript writes through helper and SessionManager parentId append", () => { const chatTs = fileURLToPath(new URL("./chat.ts", import.meta.url)); - const src = fs.readFileSync(chatTs, "utf-8"); + const chatSrc = fs.readFileSync(chatTs, "utf-8"); + const helperTs = fileURLToPath(new URL("./chat-transcript-inject.ts", import.meta.url)); + const helperSrc = fs.readFileSync(helperTs, "utf-8"); - expect(src.includes("fs.appendFileSync(transcriptPath")).toBe(false); + expect(chatSrc.includes("fs.appendFileSync(transcriptPath")).toBe(false); + expect(chatSrc).toContain("appendInjectedAssistantMessageToTranscript("); - expect(src).toContain("SessionManager.open(transcriptPath)"); - expect(src).toContain("appendMessage("); + expect(helperSrc.includes("fs.appendFileSync(params.transcriptPath")).toBe(false); + expect(helperSrc).toContain("SessionManager.open(params.transcriptPath)"); + expect(helperSrc).toContain("appendMessage(messageBody)"); }); }); diff --git a/src/gateway/server.chat.gateway-server-chat.test.ts b/src/gateway/server.chat.gateway-server-chat.test.ts index d72e54f6b..276c83540 100644 --- a/src/gateway/server.chat.gateway-server-chat.test.ts +++ b/src/gateway/server.chat.gateway-server-chat.test.ts @@ -28,7 +28,7 @@ installConnectedControlUiServerSuite((started) => { port = started.port; }); -async function waitFor(condition: () => boolean, timeoutMs = 1500) { +async function waitFor(condition: () => boolean, timeoutMs = 400) { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { if (condition()) { @@ -402,7 +402,7 @@ describe("gateway server chat", () => { { const waitP = rpcReq(webchatWs, "agent.wait", { runId: "run-wait-1", - timeoutMs: 1000, + timeoutMs: 200, }); setTimeout(() => { @@ -428,7 +428,7 @@ describe("gateway server chat", () => { const res = await rpcReq(webchatWs, "agent.wait", { runId: "run-wait-early", - timeoutMs: 1000, + timeoutMs: 200, }); expect(res.ok).toBe(true); expect(res.payload?.status).toBe("ok"); @@ -447,7 +447,7 @@ describe("gateway server chat", () => { { const waitP = rpcReq(webchatWs, "agent.wait", { runId: "run-wait-err", - timeoutMs: 1000, + timeoutMs: 50, }); setTimeout(() => { @@ -466,7 +466,7 @@ describe("gateway server chat", () => { { const waitP = rpcReq(webchatWs, "agent.wait", { runId: "run-wait-start", - timeoutMs: 1000, + timeoutMs: 200, }); emitAgentEvent({