From e707a7bd36a9250d71e142ee8e6dee4b719a9e5f Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 14 Feb 2026 13:34:16 +0000 Subject: [PATCH] refactor(memory): reuse runWithConcurrency --- src/memory/batch-gemini.ts | 37 +------------------------------------ src/memory/batch-openai.ts | 37 +------------------------------------ 2 files changed, 2 insertions(+), 72 deletions(-) diff --git a/src/memory/batch-gemini.ts b/src/memory/batch-gemini.ts index 60c8c7e9a..83c8e8318 100644 --- a/src/memory/batch-gemini.ts +++ b/src/memory/batch-gemini.ts @@ -1,7 +1,7 @@ import type { GeminiEmbeddingClient } from "./embeddings-gemini.js"; import { isTruthyEnvValue } from "../infra/env.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; -import { hashText } from "./internal.js"; +import { hashText, runWithConcurrency } from "./internal.js"; export type GeminiBatchRequest = { custom_id: string; @@ -277,41 +277,6 @@ async function waitForGeminiBatch(params: { } } -async function runWithConcurrency(tasks: Array<() => Promise>, limit: number): Promise { - if (tasks.length === 0) { - return []; - } - const resolvedLimit = Math.max(1, Math.min(limit, tasks.length)); - const results: T[] = Array.from({ length: tasks.length }); - let next = 0; - let firstError: unknown = null; - - const workers = Array.from({ length: resolvedLimit }, async () => { - while (true) { - if (firstError) { - return; - } - const index = next; - next += 1; - if (index >= tasks.length) { - return; - } - try { - results[index] = await tasks[index](); - } catch (err) { - firstError = err; - return; - } - } - }); - - await Promise.allSettled(workers); - if (firstError) { - throw firstError; - } - return results; -} - export async function runGeminiEmbeddingBatches(params: { gemini: GeminiEmbeddingClient; agentId: string; diff --git a/src/memory/batch-openai.ts b/src/memory/batch-openai.ts index 292730704..efda18838 100644 --- a/src/memory/batch-openai.ts +++ b/src/memory/batch-openai.ts @@ -1,6 +1,6 @@ import type { OpenAiEmbeddingClient } from "./embeddings-openai.js"; import { retryAsync } from "../infra/retry.js"; -import { hashText } from "./internal.js"; +import { hashText, runWithConcurrency } from "./internal.js"; export type OpenAiBatchRequest = { custom_id: string; @@ -245,41 +245,6 @@ async function waitForOpenAiBatch(params: { } } -async function runWithConcurrency(tasks: Array<() => Promise>, limit: number): Promise { - if (tasks.length === 0) { - return []; - } - const resolvedLimit = Math.max(1, Math.min(limit, tasks.length)); - const results: T[] = Array.from({ length: tasks.length }); - let next = 0; - let firstError: unknown = null; - - const workers = Array.from({ length: resolvedLimit }, async () => { - while (true) { - if (firstError) { - return; - } - const index = next; - next += 1; - if (index >= tasks.length) { - return; - } - try { - results[index] = await tasks[index](); - } catch (err) { - firstError = err; - return; - } - } - }); - - await Promise.allSettled(workers); - if (firstError) { - throw firstError; - } - return results; -} - export async function runOpenAiEmbeddingBatches(params: { openAi: OpenAiEmbeddingClient; agentId: string;