diff --git a/src/memory/batch-gemini.ts b/src/memory/batch-gemini.ts index 19a4f69fa..50f3b3f94 100644 --- a/src/memory/batch-gemini.ts +++ b/src/memory/batch-gemini.ts @@ -3,6 +3,7 @@ import { buildBatchHeaders, normalizeBatchBaseUrl } from "./batch-utils.js"; import { debugEmbeddingsLog } from "./embeddings-debug.js"; import type { GeminiEmbeddingClient } from "./embeddings-gemini.js"; import { hashText } from "./internal.js"; +import { withRemoteHttpResponse } from "./remote-http.js"; export type GeminiBatchRequest = { custom_id: string; @@ -93,19 +94,25 @@ async function submitGeminiBatch(params: { baseUrl, requests: params.requests.length, }); - const fileRes = await fetch(uploadUrl, { - method: "POST", - headers: { - ...buildBatchHeaders(params.gemini, { json: false }), - "Content-Type": uploadPayload.contentType, + const filePayload = await withRemoteHttpResponse({ + url: uploadUrl, + ssrfPolicy: params.gemini.ssrfPolicy, + init: { + method: "POST", + headers: { + ...buildBatchHeaders(params.gemini, { json: false }), + "Content-Type": uploadPayload.contentType, + }, + body: uploadPayload.body, + }, + onResponse: async (fileRes) => { + if (!fileRes.ok) { + const text = await fileRes.text(); + throw new Error(`gemini batch file upload failed: ${fileRes.status} ${text}`); + } + return (await fileRes.json()) as { name?: string; file?: { name?: string } }; }, - body: uploadPayload.body, }); - if (!fileRes.ok) { - const text = await fileRes.text(); - throw new Error(`gemini batch file upload failed: ${fileRes.status} ${text}`); - } - const filePayload = (await fileRes.json()) as { name?: string; file?: { name?: string } }; const fileId = filePayload.name ?? filePayload.file?.name; if (!fileId) { throw new Error("gemini batch file upload failed: missing file id"); @@ -125,21 +132,27 @@ async function submitGeminiBatch(params: { batchEndpoint, fileId, }); - const batchRes = await fetch(batchEndpoint, { - method: "POST", - headers: buildBatchHeaders(params.gemini, { json: true }), - body: JSON.stringify(batchBody), + return await withRemoteHttpResponse({ + url: batchEndpoint, + ssrfPolicy: params.gemini.ssrfPolicy, + init: { + method: "POST", + headers: buildBatchHeaders(params.gemini, { json: true }), + body: JSON.stringify(batchBody), + }, + onResponse: async (batchRes) => { + if (batchRes.ok) { + return (await batchRes.json()) as GeminiBatchStatus; + } + const text = await batchRes.text(); + if (batchRes.status === 404) { + throw new Error( + "gemini batch create failed: 404 (asyncBatchEmbedContent not available for this model/baseUrl). Disable remote.batch.enabled or switch providers.", + ); + } + throw new Error(`gemini batch create failed: ${batchRes.status} ${text}`); + }, }); - if (batchRes.ok) { - return (await batchRes.json()) as GeminiBatchStatus; - } - const text = await batchRes.text(); - if (batchRes.status === 404) { - throw new Error( - "gemini batch create failed: 404 (asyncBatchEmbedContent not available for this model/baseUrl). Disable remote.batch.enabled or switch providers.", - ); - } - throw new Error(`gemini batch create failed: ${batchRes.status} ${text}`); } async function fetchGeminiBatchStatus(params: { @@ -152,14 +165,20 @@ async function fetchGeminiBatchStatus(params: { : `batches/${params.batchName}`; const statusUrl = `${baseUrl}/${name}`; debugEmbeddingsLog("memory embeddings: gemini batch status", { statusUrl }); - const res = await fetch(statusUrl, { - headers: buildBatchHeaders(params.gemini, { json: true }), + return await withRemoteHttpResponse({ + url: statusUrl, + ssrfPolicy: params.gemini.ssrfPolicy, + init: { + headers: buildBatchHeaders(params.gemini, { json: true }), + }, + onResponse: async (res) => { + if (!res.ok) { + const text = await res.text(); + throw new Error(`gemini batch status failed: ${res.status} ${text}`); + } + return (await res.json()) as GeminiBatchStatus; + }, }); - if (!res.ok) { - const text = await res.text(); - throw new Error(`gemini batch status failed: ${res.status} ${text}`); - } - return (await res.json()) as GeminiBatchStatus; } async function fetchGeminiFileContent(params: { @@ -170,14 +189,20 @@ async function fetchGeminiFileContent(params: { const file = params.fileId.startsWith("files/") ? params.fileId : `files/${params.fileId}`; const downloadUrl = `${baseUrl}/${file}:download`; debugEmbeddingsLog("memory embeddings: gemini batch download", { downloadUrl }); - const res = await fetch(downloadUrl, { - headers: buildBatchHeaders(params.gemini, { json: true }), + return await withRemoteHttpResponse({ + url: downloadUrl, + ssrfPolicy: params.gemini.ssrfPolicy, + init: { + headers: buildBatchHeaders(params.gemini, { json: true }), + }, + onResponse: async (res) => { + if (!res.ok) { + const text = await res.text(); + throw new Error(`gemini batch file content failed: ${res.status} ${text}`); + } + return await res.text(); + }, }); - if (!res.ok) { - const text = await res.text(); - throw new Error(`gemini batch file content failed: ${res.status} ${text}`); - } - return await res.text(); } function parseGeminiBatchOutput(text: string): GeminiBatchOutputLine[] { diff --git a/src/memory/batch-openai.ts b/src/memory/batch-openai.ts index 0f4a34754..c1a0a97c4 100644 --- a/src/memory/batch-openai.ts +++ b/src/memory/batch-openai.ts @@ -5,6 +5,7 @@ import { runEmbeddingBatchGroups } from "./batch-runner.js"; import { uploadBatchJsonlFile } from "./batch-upload.js"; import { buildBatchHeaders, normalizeBatchBaseUrl } from "./batch-utils.js"; import type { OpenAiEmbeddingClient } from "./embeddings-openai.js"; +import { withRemoteHttpResponse } from "./remote-http.js"; export type OpenAiBatchRequest = { custom_id: string; @@ -54,6 +55,7 @@ async function submitOpenAiBatch(params: { return await postJsonWithRetry({ url: `${baseUrl}/batches`, headers: buildBatchHeaders(params.openAi, { json: true }), + ssrfPolicy: params.openAi.ssrfPolicy, body: { input_file_id: inputFileId, endpoint: OPENAI_BATCH_ENDPOINT, @@ -72,14 +74,20 @@ async function fetchOpenAiBatchStatus(params: { batchId: string; }): Promise { const baseUrl = normalizeBatchBaseUrl(params.openAi); - const res = await fetch(`${baseUrl}/batches/${params.batchId}`, { - headers: buildBatchHeaders(params.openAi, { json: true }), + return await withRemoteHttpResponse({ + url: `${baseUrl}/batches/${params.batchId}`, + ssrfPolicy: params.openAi.ssrfPolicy, + init: { + headers: buildBatchHeaders(params.openAi, { json: true }), + }, + onResponse: async (res) => { + if (!res.ok) { + const text = await res.text(); + throw new Error(`openai batch status failed: ${res.status} ${text}`); + } + return (await res.json()) as OpenAiBatchStatus; + }, }); - if (!res.ok) { - const text = await res.text(); - throw new Error(`openai batch status failed: ${res.status} ${text}`); - } - return (await res.json()) as OpenAiBatchStatus; } async function fetchOpenAiFileContent(params: { @@ -87,14 +95,20 @@ async function fetchOpenAiFileContent(params: { fileId: string; }): Promise { const baseUrl = normalizeBatchBaseUrl(params.openAi); - const res = await fetch(`${baseUrl}/files/${params.fileId}/content`, { - headers: buildBatchHeaders(params.openAi, { json: true }), + return await withRemoteHttpResponse({ + url: `${baseUrl}/files/${params.fileId}/content`, + ssrfPolicy: params.openAi.ssrfPolicy, + init: { + headers: buildBatchHeaders(params.openAi, { json: true }), + }, + onResponse: async (res) => { + if (!res.ok) { + const text = await res.text(); + throw new Error(`openai batch file content failed: ${res.status} ${text}`); + } + return await res.text(); + }, }); - if (!res.ok) { - const text = await res.text(); - throw new Error(`openai batch file content failed: ${res.status} ${text}`); - } - return await res.text(); } function parseOpenAiBatchOutput(text: string): OpenAiBatchOutputLine[] { diff --git a/src/memory/batch-upload.ts b/src/memory/batch-upload.ts index 94b871305..efe4aa700 100644 --- a/src/memory/batch-upload.ts +++ b/src/memory/batch-upload.ts @@ -4,6 +4,7 @@ import { type BatchHttpClientConfig, } from "./batch-utils.js"; import { hashText } from "./internal.js"; +import { withRemoteHttpResponse } from "./remote-http.js"; export async function uploadBatchJsonlFile(params: { client: BatchHttpClientConfig; @@ -20,16 +21,22 @@ export async function uploadBatchJsonlFile(params: { `memory-embeddings.${hashText(String(Date.now()))}.jsonl`, ); - const fileRes = await fetch(`${baseUrl}/files`, { - method: "POST", - headers: buildBatchHeaders(params.client, { json: false }), - body: form, + const filePayload = await withRemoteHttpResponse({ + url: `${baseUrl}/files`, + ssrfPolicy: params.client.ssrfPolicy, + init: { + method: "POST", + headers: buildBatchHeaders(params.client, { json: false }), + body: form, + }, + onResponse: async (fileRes) => { + if (!fileRes.ok) { + const text = await fileRes.text(); + throw new Error(`${params.errorPrefix}: ${fileRes.status} ${text}`); + } + return (await fileRes.json()) as { id?: string }; + }, }); - if (!fileRes.ok) { - const text = await fileRes.text(); - throw new Error(`${params.errorPrefix}: ${fileRes.status} ${text}`); - } - const filePayload = (await fileRes.json()) as { id?: string }; if (!filePayload.id) { throw new Error(`${params.errorPrefix}: missing file id`); } diff --git a/src/memory/batch-utils.ts b/src/memory/batch-utils.ts index 95aa773e8..c8f9249d9 100644 --- a/src/memory/batch-utils.ts +++ b/src/memory/batch-utils.ts @@ -1,6 +1,9 @@ +import type { SsrFPolicy } from "../infra/net/ssrf.js"; + export type BatchHttpClientConfig = { baseUrl?: string; headers?: Record; + ssrfPolicy?: SsrFPolicy; }; export function normalizeBatchBaseUrl(client: BatchHttpClientConfig): string { diff --git a/src/memory/batch-voyage.ts b/src/memory/batch-voyage.ts index e1f4c4df9..322adedc3 100644 --- a/src/memory/batch-voyage.ts +++ b/src/memory/batch-voyage.ts @@ -7,6 +7,7 @@ import { runEmbeddingBatchGroups } from "./batch-runner.js"; import { uploadBatchJsonlFile } from "./batch-upload.js"; import { buildBatchHeaders, normalizeBatchBaseUrl } from "./batch-utils.js"; import type { VoyageEmbeddingClient } from "./embeddings-voyage.js"; +import { withRemoteHttpResponse } from "./remote-http.js"; /** * Voyage Batch API Input Line format. @@ -58,6 +59,7 @@ async function submitVoyageBatch(params: { return await postJsonWithRetry({ url: `${baseUrl}/batches`, headers: buildBatchHeaders(params.client, { json: true }), + ssrfPolicy: params.client.ssrfPolicy, body: { input_file_id: inputFileId, endpoint: VOYAGE_BATCH_ENDPOINT, @@ -80,14 +82,20 @@ async function fetchVoyageBatchStatus(params: { batchId: string; }): Promise { const baseUrl = normalizeBatchBaseUrl(params.client); - const res = await fetch(`${baseUrl}/batches/${params.batchId}`, { - headers: buildBatchHeaders(params.client, { json: true }), + return await withRemoteHttpResponse({ + url: `${baseUrl}/batches/${params.batchId}`, + ssrfPolicy: params.client.ssrfPolicy, + init: { + headers: buildBatchHeaders(params.client, { json: true }), + }, + onResponse: async (res) => { + if (!res.ok) { + const text = await res.text(); + throw new Error(`voyage batch status failed: ${res.status} ${text}`); + } + return (await res.json()) as VoyageBatchStatus; + }, }); - if (!res.ok) { - const text = await res.text(); - throw new Error(`voyage batch status failed: ${res.status} ${text}`); - } - return (await res.json()) as VoyageBatchStatus; } async function readVoyageBatchError(params: { @@ -96,23 +104,29 @@ async function readVoyageBatchError(params: { }): Promise { try { const baseUrl = normalizeBatchBaseUrl(params.client); - const res = await fetch(`${baseUrl}/files/${params.errorFileId}/content`, { - headers: buildBatchHeaders(params.client, { json: true }), + return await withRemoteHttpResponse({ + url: `${baseUrl}/files/${params.errorFileId}/content`, + ssrfPolicy: params.client.ssrfPolicy, + init: { + headers: buildBatchHeaders(params.client, { json: true }), + }, + onResponse: async (res) => { + if (!res.ok) { + const text = await res.text(); + throw new Error(`voyage batch error file content failed: ${res.status} ${text}`); + } + const text = await res.text(); + if (!text.trim()) { + return undefined; + } + const lines = text + .split("\n") + .map((line) => line.trim()) + .filter(Boolean) + .map((line) => JSON.parse(line) as VoyageBatchOutputLine); + return extractBatchErrorMessage(lines); + }, }); - if (!res.ok) { - const text = await res.text(); - throw new Error(`voyage batch error file content failed: ${res.status} ${text}`); - } - const text = await res.text(); - if (!text.trim()) { - return undefined; - } - const lines = text - .split("\n") - .map((line) => line.trim()) - .filter(Boolean) - .map((line) => JSON.parse(line) as VoyageBatchOutputLine); - return extractBatchErrorMessage(lines); } catch (err) { return formatUnavailableBatchError(err); } @@ -228,33 +242,40 @@ export async function runVoyageEmbeddingBatches(params: { } const baseUrl = normalizeBatchBaseUrl(params.client); - const contentRes = await fetch(`${baseUrl}/files/${completed.outputFileId}/content`, { - headers: buildBatchHeaders(params.client, { json: true }), - }); - if (!contentRes.ok) { - const text = await contentRes.text(); - throw new Error(`voyage batch file content failed: ${contentRes.status} ${text}`); - } - const errors: string[] = []; const remaining = new Set(group.map((request) => request.custom_id)); - if (contentRes.body) { - const reader = createInterface({ - input: Readable.fromWeb( - contentRes.body as unknown as import("stream/web").ReadableStream, - ), - terminal: false, - }); - - for await (const rawLine of reader) { - if (!rawLine.trim()) { - continue; + await withRemoteHttpResponse({ + url: `${baseUrl}/files/${completed.outputFileId}/content`, + ssrfPolicy: params.client.ssrfPolicy, + init: { + headers: buildBatchHeaders(params.client, { json: true }), + }, + onResponse: async (contentRes) => { + if (!contentRes.ok) { + const text = await contentRes.text(); + throw new Error(`voyage batch file content failed: ${contentRes.status} ${text}`); } - const line = JSON.parse(rawLine) as VoyageBatchOutputLine; - applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId }); - } - } + + if (!contentRes.body) { + return; + } + const reader = createInterface({ + input: Readable.fromWeb( + contentRes.body as unknown as import("stream/web").ReadableStream, + ), + terminal: false, + }); + + for await (const rawLine of reader) { + if (!rawLine.trim()) { + continue; + } + const line = JSON.parse(rawLine) as VoyageBatchOutputLine; + applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId }); + } + }, + }); if (errors.length > 0) { throw new Error(`voyage batch ${batchInfo.id} failed: ${errors.join("; ")}`);