refactor(memory): share post-json helper across remote fetchers

This commit is contained in:
Peter Steinberger
2026-02-22 20:03:56 +00:00
parent 2dcb244985
commit 6ef4eda1f0
7 changed files with 249 additions and 38 deletions

View File

@@ -0,0 +1,78 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { retryAsync } from "../infra/retry.js";
import { postJsonWithRetry } from "./batch-http.js";
import { postJson } from "./post-json.js";
vi.mock("../infra/retry.js", () => ({
retryAsync: vi.fn(async (run: () => Promise<unknown>) => await run()),
}));
vi.mock("./post-json.js", () => ({
postJson: vi.fn(),
}));
describe("postJsonWithRetry", () => {
const retryAsyncMock = vi.mocked(retryAsync);
const postJsonMock = vi.mocked(postJson);
beforeEach(() => {
vi.clearAllMocks();
});
it("posts JSON and returns parsed response payload", async () => {
postJsonMock.mockImplementationOnce(async (params) => {
return await params.parse({ ok: true, ids: [1, 2] });
});
const result = await postJsonWithRetry<{ ok: boolean; ids: number[] }>({
url: "https://memory.example/v1/batch",
headers: { Authorization: "Bearer test" },
body: { chunks: ["a", "b"] },
errorPrefix: "memory batch failed",
});
expect(result).toEqual({ ok: true, ids: [1, 2] });
expect(postJsonMock).toHaveBeenCalledWith(
expect.objectContaining({
url: "https://memory.example/v1/batch",
headers: { Authorization: "Bearer test" },
body: { chunks: ["a", "b"] },
errorPrefix: "memory batch failed",
attachStatus: true,
}),
);
const retryOptions = retryAsyncMock.mock.calls[0]?.[1] as
| {
attempts: number;
minDelayMs: number;
maxDelayMs: number;
shouldRetry: (err: unknown) => boolean;
}
| undefined;
expect(retryOptions?.attempts).toBe(3);
expect(retryOptions?.minDelayMs).toBe(300);
expect(retryOptions?.maxDelayMs).toBe(2000);
expect(retryOptions?.shouldRetry({ status: 429 })).toBe(true);
expect(retryOptions?.shouldRetry({ status: 503 })).toBe(true);
expect(retryOptions?.shouldRetry({ status: 400 })).toBe(false);
});
it("attaches status to non-ok errors", async () => {
postJsonMock.mockRejectedValueOnce(
Object.assign(new Error("memory batch failed: 503 backend down"), { status: 503 }),
);
await expect(
postJsonWithRetry({
url: "https://memory.example/v1/batch",
headers: {},
body: { chunks: [] },
errorPrefix: "memory batch failed",
}),
).rejects.toMatchObject({
message: expect.stringContaining("memory batch failed: 503 backend down"),
status: 503,
});
});
});

View File

@@ -1,6 +1,6 @@
import type { SsrFPolicy } from "../infra/net/ssrf.js";
import { retryAsync } from "../infra/retry.js";
import { withRemoteHttpResponse } from "./remote-http.js";
import { postJson } from "./post-json.js";
export async function postJsonWithRetry<T>(params: {
url: string;
@@ -11,25 +11,14 @@ export async function postJsonWithRetry<T>(params: {
}): Promise<T> {
return await retryAsync(
async () => {
return await withRemoteHttpResponse({
return await postJson<T>({
url: params.url,
headers: params.headers,
ssrfPolicy: params.ssrfPolicy,
init: {
method: "POST",
headers: params.headers,
body: JSON.stringify(params.body),
},
onResponse: async (res) => {
if (!res.ok) {
const text = await res.text();
const err = new Error(`${params.errorPrefix}: ${res.status} ${text}`) as Error & {
status?: number;
};
err.status = res.status;
throw err;
}
return (await res.json()) as T;
},
body: params.body,
errorPrefix: params.errorPrefix,
attachStatus: true,
parse: async (payload) => payload as T,
});
},
{

View File

@@ -0,0 +1,53 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { fetchRemoteEmbeddingVectors } from "./embeddings-remote-fetch.js";
import { postJson } from "./post-json.js";
vi.mock("./post-json.js", () => ({
postJson: vi.fn(),
}));
describe("fetchRemoteEmbeddingVectors", () => {
const postJsonMock = vi.mocked(postJson);
beforeEach(() => {
vi.clearAllMocks();
});
it("maps remote embedding response data to vectors", async () => {
postJsonMock.mockImplementationOnce(async (params) => {
return await params.parse({
data: [{ embedding: [0.1, 0.2] }, {}, { embedding: [0.3] }],
});
});
const vectors = await fetchRemoteEmbeddingVectors({
url: "https://memory.example/v1/embeddings",
headers: { Authorization: "Bearer test" },
body: { input: ["one", "two", "three"] },
errorPrefix: "embedding fetch failed",
});
expect(vectors).toEqual([[0.1, 0.2], [], [0.3]]);
expect(postJsonMock).toHaveBeenCalledWith(
expect.objectContaining({
url: "https://memory.example/v1/embeddings",
headers: { Authorization: "Bearer test" },
body: { input: ["one", "two", "three"] },
errorPrefix: "embedding fetch failed",
}),
);
});
it("throws a status-rich error on non-ok responses", async () => {
postJsonMock.mockRejectedValueOnce(new Error("embedding fetch failed: 403 forbidden"));
await expect(
fetchRemoteEmbeddingVectors({
url: "https://memory.example/v1/embeddings",
headers: {},
body: { input: ["one"] },
errorPrefix: "embedding fetch failed",
}),
).rejects.toThrow("embedding fetch failed: 403 forbidden");
});
});

View File

@@ -1,5 +1,5 @@
import type { SsrFPolicy } from "../infra/net/ssrf.js";
import { withRemoteHttpResponse } from "./remote-http.js";
import { postJson } from "./post-json.js";
export async function fetchRemoteEmbeddingVectors(params: {
url: string;
@@ -8,23 +8,17 @@ export async function fetchRemoteEmbeddingVectors(params: {
body: unknown;
errorPrefix: string;
}): Promise<number[][]> {
return await withRemoteHttpResponse({
return await postJson({
url: params.url,
headers: params.headers,
ssrfPolicy: params.ssrfPolicy,
init: {
method: "POST",
headers: params.headers,
body: JSON.stringify(params.body),
},
onResponse: async (res) => {
if (!res.ok) {
const text = await res.text();
throw new Error(`${params.errorPrefix}: ${res.status} ${text}`);
}
const payload = (await res.json()) as {
body: params.body,
errorPrefix: params.errorPrefix,
parse: (payload) => {
const typedPayload = payload as {
data?: Array<{ embedding?: number[] }>;
};
const data = payload.data ?? [];
const data = typedPayload.data ?? [];
return data.map((entry) => entry.embedding ?? []);
},
});

View File

@@ -26,18 +26,27 @@ describe("memory vector dedupe", () => {
let indexPath: string;
let manager: MemoryIndexManager | null = null;
async function seedMemoryWorkspace(rootDir: string) {
await fs.mkdir(path.join(rootDir, "memory"));
await fs.writeFile(path.join(rootDir, "MEMORY.md"), "Hello memory.");
}
async function closeManagerIfOpen() {
if (!manager) {
return;
}
await manager.close();
manager = null;
}
beforeEach(async () => {
workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-mem-"));
indexPath = path.join(workspaceDir, "index.sqlite");
await fs.mkdir(path.join(workspaceDir, "memory"));
await fs.writeFile(path.join(workspaceDir, "MEMORY.md"), "Hello memory.");
await seedMemoryWorkspace(workspaceDir);
});
afterEach(async () => {
if (manager) {
await manager.close();
manager = null;
}
await closeManagerIfOpen();
await fs.rm(workspaceDir, { recursive: true, force: true });
});

View File

@@ -0,0 +1,53 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { postJson } from "./post-json.js";
import { withRemoteHttpResponse } from "./remote-http.js";
vi.mock("./remote-http.js", () => ({
withRemoteHttpResponse: vi.fn(),
}));
describe("postJson", () => {
const remoteHttpMock = vi.mocked(withRemoteHttpResponse);
beforeEach(() => {
vi.clearAllMocks();
});
it("parses JSON payload on successful response", async () => {
remoteHttpMock.mockImplementationOnce(async (params) => {
return await params.onResponse(
new Response(JSON.stringify({ data: [{ embedding: [1, 2] }] }), { status: 200 }),
);
});
const result = await postJson({
url: "https://memory.example/v1/post",
headers: { Authorization: "Bearer test" },
body: { input: ["x"] },
errorPrefix: "post failed",
parse: (payload) => payload,
});
expect(result).toEqual({ data: [{ embedding: [1, 2] }] });
});
it("attaches status to thrown error when requested", async () => {
remoteHttpMock.mockImplementationOnce(async (params) => {
return await params.onResponse(new Response("bad gateway", { status: 502 }));
});
await expect(
postJson({
url: "https://memory.example/v1/post",
headers: {},
body: {},
errorPrefix: "post failed",
attachStatus: true,
parse: () => ({}),
}),
).rejects.toMatchObject({
message: expect.stringContaining("post failed: 502 bad gateway"),
status: 502,
});
});
});

35
src/memory/post-json.ts Normal file
View File

@@ -0,0 +1,35 @@
import type { SsrFPolicy } from "../infra/net/ssrf.js";
import { withRemoteHttpResponse } from "./remote-http.js";
export async function postJson<T>(params: {
url: string;
headers: Record<string, string>;
ssrfPolicy?: SsrFPolicy;
body: unknown;
errorPrefix: string;
attachStatus?: boolean;
parse: (payload: unknown) => T | Promise<T>;
}): Promise<T> {
return await withRemoteHttpResponse({
url: params.url,
ssrfPolicy: params.ssrfPolicy,
init: {
method: "POST",
headers: params.headers,
body: JSON.stringify(params.body),
},
onResponse: async (res) => {
if (!res.ok) {
const text = await res.text();
const err = new Error(`${params.errorPrefix}: ${res.status} ${text}`) as Error & {
status?: number;
};
if (params.attachStatus) {
err.status = res.status;
}
throw err;
}
return await params.parse(await res.json());
},
});
}