refactor: extract gateway transcript append helper
This commit is contained in:
75
src/gateway/server-methods/chat-transcript-inject.ts
Normal file
75
src/gateway/server-methods/chat-transcript-inject.ts
Normal file
@@ -0,0 +1,75 @@
|
||||
import { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
|
||||
type AppendMessageArg = Parameters<SessionManager["appendMessage"]>[0];
|
||||
|
||||
export type GatewayInjectedAbortMeta = {
|
||||
aborted: true;
|
||||
origin: "rpc" | "stop-command";
|
||||
runId: string;
|
||||
};
|
||||
|
||||
export type GatewayInjectedTranscriptAppendResult = {
|
||||
ok: boolean;
|
||||
messageId?: string;
|
||||
message?: Record<string, unknown>;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
export function appendInjectedAssistantMessageToTranscript(params: {
|
||||
transcriptPath: string;
|
||||
message: string;
|
||||
label?: string;
|
||||
idempotencyKey?: string;
|
||||
abortMeta?: GatewayInjectedAbortMeta;
|
||||
now?: number;
|
||||
}): GatewayInjectedTranscriptAppendResult {
|
||||
const now = params.now ?? Date.now();
|
||||
const labelPrefix = params.label ? `[${params.label}]\n\n` : "";
|
||||
const usage = {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 0,
|
||||
cost: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
total: 0,
|
||||
},
|
||||
};
|
||||
const messageBody: AppendMessageArg & Record<string, unknown> = {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: `${labelPrefix}${params.message}` }],
|
||||
timestamp: now,
|
||||
// Pi stopReason is a strict enum; this is not model output, but we still store it as a
|
||||
// normal assistant message so it participates in the session parentId chain.
|
||||
stopReason: "stop",
|
||||
usage,
|
||||
// Make these explicit so downstream tooling never treats this as model output.
|
||||
api: "openai-responses",
|
||||
provider: "openclaw",
|
||||
model: "gateway-injected",
|
||||
...(params.idempotencyKey ? { idempotencyKey: params.idempotencyKey } : {}),
|
||||
...(params.abortMeta
|
||||
? {
|
||||
openclawAbort: {
|
||||
aborted: true,
|
||||
origin: params.abortMeta.origin,
|
||||
runId: params.abortMeta.runId,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
};
|
||||
|
||||
try {
|
||||
// IMPORTANT: Use SessionManager so the entry is attached to the current leaf via parentId.
|
||||
// Raw jsonl appends break the parent chain and can hide compaction summaries from context.
|
||||
const sessionManager = SessionManager.open(params.transcriptPath);
|
||||
const messageId = sessionManager.appendMessage(messageBody);
|
||||
return { ok: true, messageId, message: messageBody };
|
||||
} catch (err) {
|
||||
return { ok: false, error: err instanceof Error ? err.message : String(err) };
|
||||
}
|
||||
}
|
||||
@@ -1,62 +1,37 @@
|
||||
import fs from "node:fs";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { createMockSessionEntry, createTranscriptFixtureSync } from "./chat.test-helpers.js";
|
||||
import type { GatewayRequestContext } from "./types.js";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { appendInjectedAssistantMessageToTranscript } from "./chat-transcript-inject.js";
|
||||
import { createTranscriptFixtureSync } from "./chat.test-helpers.js";
|
||||
|
||||
// Guardrail: Ensure gateway "injected" assistant transcript messages are appended via SessionManager,
|
||||
// so they are attached to the current leaf with a `parentId` and do not sever compaction history.
|
||||
describe("gateway chat.inject transcript writes", () => {
|
||||
it("appends a Pi session entry that includes parentId", async () => {
|
||||
const sessionId = "sess-1";
|
||||
const { transcriptPath } = createTranscriptFixtureSync({
|
||||
const { dir, transcriptPath } = createTranscriptFixtureSync({
|
||||
prefix: "openclaw-chat-inject-",
|
||||
sessionId,
|
||||
sessionId: "sess-1",
|
||||
});
|
||||
|
||||
vi.doMock("../session-utils.js", async (importOriginal) => {
|
||||
const original = await importOriginal<typeof import("../session-utils.js")>();
|
||||
return {
|
||||
...original,
|
||||
loadSessionEntry: () =>
|
||||
createMockSessionEntry({
|
||||
transcriptPath,
|
||||
sessionId,
|
||||
canonicalKey: "k1",
|
||||
}),
|
||||
};
|
||||
});
|
||||
try {
|
||||
const appended = appendInjectedAssistantMessageToTranscript({
|
||||
transcriptPath,
|
||||
message: "hello",
|
||||
});
|
||||
expect(appended.ok).toBe(true);
|
||||
expect(appended.messageId).toBeTruthy();
|
||||
|
||||
const { chatHandlers } = await import("./chat.js");
|
||||
const lines = fs.readFileSync(transcriptPath, "utf-8").split(/\r?\n/).filter(Boolean);
|
||||
expect(lines.length).toBeGreaterThanOrEqual(2);
|
||||
|
||||
const respond = vi.fn();
|
||||
type InjectCtx = Pick<GatewayRequestContext, "broadcast" | "nodeSendToSession">;
|
||||
const context: InjectCtx = {
|
||||
broadcast: vi.fn() as unknown as InjectCtx["broadcast"],
|
||||
nodeSendToSession: vi.fn() as unknown as InjectCtx["nodeSendToSession"],
|
||||
};
|
||||
await chatHandlers["chat.inject"]({
|
||||
params: { sessionKey: "k1", message: "hello" },
|
||||
respond,
|
||||
req: {} as never,
|
||||
client: null as never,
|
||||
isWebchatConnect: () => false,
|
||||
context: context as unknown as GatewayRequestContext,
|
||||
});
|
||||
const last = JSON.parse(lines.at(-1) as string) as Record<string, unknown>;
|
||||
expect(last.type).toBe("message");
|
||||
|
||||
expect(respond).toHaveBeenCalled();
|
||||
const [, payload, error] = respond.mock.calls.at(-1) ?? [];
|
||||
expect(error).toBeUndefined();
|
||||
expect(payload).toMatchObject({ ok: true });
|
||||
|
||||
const lines = fs.readFileSync(transcriptPath, "utf-8").split(/\r?\n/).filter(Boolean);
|
||||
expect(lines.length).toBeGreaterThanOrEqual(2);
|
||||
|
||||
const last = JSON.parse(lines.at(-1) as string) as Record<string, unknown>;
|
||||
expect(last.type).toBe("message");
|
||||
|
||||
// The regression we saw: raw jsonl appends omitted this field entirely.
|
||||
expect(Object.prototype.hasOwnProperty.call(last, "parentId")).toBe(true);
|
||||
expect(last).toHaveProperty("id");
|
||||
expect(last).toHaveProperty("message");
|
||||
// The regression we saw: raw jsonl appends omitted this field entirely.
|
||||
expect(Object.prototype.hasOwnProperty.call(last, "parentId")).toBe(true);
|
||||
expect(last).toHaveProperty("id");
|
||||
expect(last).toHaveProperty("message");
|
||||
} finally {
|
||||
fs.rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
|
||||
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
|
||||
import { resolveThinkingDefault } from "../../agents/model-selection.js";
|
||||
import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
|
||||
@@ -45,6 +45,7 @@ import {
|
||||
import { formatForLog } from "../ws-log.js";
|
||||
import { injectTimestamp, timestampOptsFromConfig } from "./agent-timestamp.js";
|
||||
import { normalizeRpcAttachmentsToChatAttachments } from "./attachment-normalize.js";
|
||||
import { appendInjectedAssistantMessageToTranscript } from "./chat-transcript-inject.js";
|
||||
import type { GatewayRequestContext, GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
type TranscriptAppendResult = {
|
||||
@@ -54,7 +55,6 @@ type TranscriptAppendResult = {
|
||||
error?: string;
|
||||
};
|
||||
|
||||
type AppendMessageArg = Parameters<SessionManager["appendMessage"]>[0];
|
||||
type AbortOrigin = "rpc" | "stop-command";
|
||||
|
||||
type AbortedPartialSnapshot = {
|
||||
@@ -376,55 +376,13 @@ function appendAssistantTranscriptMessage(params: {
|
||||
return { ok: true };
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const labelPrefix = params.label ? `[${params.label}]\n\n` : "";
|
||||
const usage = {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 0,
|
||||
cost: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
total: 0,
|
||||
},
|
||||
};
|
||||
const messageBody: AppendMessageArg & Record<string, unknown> = {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: `${labelPrefix}${params.message}` }],
|
||||
timestamp: now,
|
||||
// Pi stopReason is a strict enum; this is not model output, but we still store it as a
|
||||
// normal assistant message so it participates in the session parentId chain.
|
||||
stopReason: "stop",
|
||||
usage,
|
||||
// Make these explicit so downstream tooling never treats this as model output.
|
||||
api: "openai-responses",
|
||||
provider: "openclaw",
|
||||
model: "gateway-injected",
|
||||
...(params.idempotencyKey ? { idempotencyKey: params.idempotencyKey } : {}),
|
||||
...(params.abortMeta
|
||||
? {
|
||||
openclawAbort: {
|
||||
aborted: true,
|
||||
origin: params.abortMeta.origin,
|
||||
runId: params.abortMeta.runId,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
};
|
||||
|
||||
try {
|
||||
// IMPORTANT: Use SessionManager so the entry is attached to the current leaf via parentId.
|
||||
// Raw jsonl appends break the parent chain and can hide compaction summaries from context.
|
||||
const sessionManager = SessionManager.open(transcriptPath);
|
||||
const messageId = sessionManager.appendMessage(messageBody);
|
||||
return { ok: true, messageId, message: messageBody };
|
||||
} catch (err) {
|
||||
return { ok: false, error: err instanceof Error ? err.message : String(err) };
|
||||
}
|
||||
return appendInjectedAssistantMessageToTranscript({
|
||||
transcriptPath,
|
||||
message: params.message,
|
||||
label: params.label,
|
||||
idempotencyKey: params.idempotencyKey,
|
||||
abortMeta: params.abortMeta,
|
||||
});
|
||||
}
|
||||
|
||||
function collectSessionAbortPartials(params: {
|
||||
|
||||
@@ -224,14 +224,18 @@ describe("sanitizeChatSendMessageInput", () => {
|
||||
});
|
||||
|
||||
describe("gateway chat transcript writes (guardrail)", () => {
|
||||
it("does not append transcript messages via raw fs.appendFileSync(transcriptPath, ...)", () => {
|
||||
it("routes transcript writes through helper and SessionManager parentId append", () => {
|
||||
const chatTs = fileURLToPath(new URL("./chat.ts", import.meta.url));
|
||||
const src = fs.readFileSync(chatTs, "utf-8");
|
||||
const chatSrc = fs.readFileSync(chatTs, "utf-8");
|
||||
const helperTs = fileURLToPath(new URL("./chat-transcript-inject.ts", import.meta.url));
|
||||
const helperSrc = fs.readFileSync(helperTs, "utf-8");
|
||||
|
||||
expect(src.includes("fs.appendFileSync(transcriptPath")).toBe(false);
|
||||
expect(chatSrc.includes("fs.appendFileSync(transcriptPath")).toBe(false);
|
||||
expect(chatSrc).toContain("appendInjectedAssistantMessageToTranscript(");
|
||||
|
||||
expect(src).toContain("SessionManager.open(transcriptPath)");
|
||||
expect(src).toContain("appendMessage(");
|
||||
expect(helperSrc.includes("fs.appendFileSync(params.transcriptPath")).toBe(false);
|
||||
expect(helperSrc).toContain("SessionManager.open(params.transcriptPath)");
|
||||
expect(helperSrc).toContain("appendMessage(messageBody)");
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ installConnectedControlUiServerSuite((started) => {
|
||||
port = started.port;
|
||||
});
|
||||
|
||||
async function waitFor(condition: () => boolean, timeoutMs = 1500) {
|
||||
async function waitFor(condition: () => boolean, timeoutMs = 400) {
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
while (Date.now() < deadline) {
|
||||
if (condition()) {
|
||||
@@ -402,7 +402,7 @@ describe("gateway server chat", () => {
|
||||
{
|
||||
const waitP = rpcReq(webchatWs, "agent.wait", {
|
||||
runId: "run-wait-1",
|
||||
timeoutMs: 1000,
|
||||
timeoutMs: 200,
|
||||
});
|
||||
|
||||
setTimeout(() => {
|
||||
@@ -428,7 +428,7 @@ describe("gateway server chat", () => {
|
||||
|
||||
const res = await rpcReq(webchatWs, "agent.wait", {
|
||||
runId: "run-wait-early",
|
||||
timeoutMs: 1000,
|
||||
timeoutMs: 200,
|
||||
});
|
||||
expect(res.ok).toBe(true);
|
||||
expect(res.payload?.status).toBe("ok");
|
||||
@@ -447,7 +447,7 @@ describe("gateway server chat", () => {
|
||||
{
|
||||
const waitP = rpcReq(webchatWs, "agent.wait", {
|
||||
runId: "run-wait-err",
|
||||
timeoutMs: 1000,
|
||||
timeoutMs: 50,
|
||||
});
|
||||
|
||||
setTimeout(() => {
|
||||
@@ -466,7 +466,7 @@ describe("gateway server chat", () => {
|
||||
{
|
||||
const waitP = rpcReq(webchatWs, "agent.wait", {
|
||||
runId: "run-wait-start",
|
||||
timeoutMs: 1000,
|
||||
timeoutMs: 200,
|
||||
});
|
||||
|
||||
emitAgentEvent({
|
||||
|
||||
Reference in New Issue
Block a user