Files
Moltbot/src/memory/manager.ts
2026-02-17 01:50:04 +01:00

620 lines
20 KiB
TypeScript

import type { DatabaseSync } from "node:sqlite";
import { type FSWatcher } from "chokidar";
import fs from "node:fs/promises";
import path from "node:path";
import type { ResolvedMemorySearchConfig } from "../agents/memory-search.js";
import type { OpenClawConfig } from "../config/config.js";
import type {
MemoryEmbeddingProbeResult,
MemoryProviderStatus,
MemorySearchManager,
MemorySearchResult,
MemorySource,
MemorySyncProgressUpdate,
} from "./types.js";
import { resolveAgentDir, resolveAgentWorkspaceDir } from "../agents/agent-scope.js";
import { resolveMemorySearchConfig } from "../agents/memory-search.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import {
createEmbeddingProvider,
type EmbeddingProvider,
type EmbeddingProviderResult,
type GeminiEmbeddingClient,
type OpenAiEmbeddingClient,
type VoyageEmbeddingClient,
} from "./embeddings.js";
import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js";
import { isMemoryPath, normalizeExtraMemoryPaths } from "./internal.js";
import { MemoryManagerEmbeddingOps } from "./manager-embedding-ops.js";
import { searchKeyword, searchVector } from "./manager-search.js";
import { extractKeywords } from "./query-expansion.js";
const SNIPPET_MAX_CHARS = 700;
const VECTOR_TABLE = "chunks_vec";
const FTS_TABLE = "chunks_fts";
const EMBEDDING_CACHE_TABLE = "embedding_cache";
const BATCH_FAILURE_LIMIT = 2;
const log = createSubsystemLogger("memory");
const INDEX_CACHE = new Map<string, MemoryIndexManager>();
export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements MemorySearchManager {
private readonly cacheKey: string;
protected readonly cfg: OpenClawConfig;
protected readonly agentId: string;
protected readonly workspaceDir: string;
protected readonly settings: ResolvedMemorySearchConfig;
protected provider: EmbeddingProvider | null;
private readonly requestedProvider: "openai" | "local" | "gemini" | "voyage" | "auto";
protected fallbackFrom?: "openai" | "local" | "gemini" | "voyage";
protected fallbackReason?: string;
private readonly providerUnavailableReason?: string;
protected openAi?: OpenAiEmbeddingClient;
protected gemini?: GeminiEmbeddingClient;
protected voyage?: VoyageEmbeddingClient;
protected batch: {
enabled: boolean;
wait: boolean;
concurrency: number;
pollIntervalMs: number;
timeoutMs: number;
};
protected batchFailureCount = 0;
protected batchFailureLastError?: string;
protected batchFailureLastProvider?: string;
protected batchFailureLock: Promise<void> = Promise.resolve();
protected db: DatabaseSync;
protected readonly sources: Set<MemorySource>;
protected providerKey: string;
protected readonly cache: { enabled: boolean; maxEntries?: number };
protected readonly vector: {
enabled: boolean;
available: boolean | null;
extensionPath?: string;
loadError?: string;
dims?: number;
};
protected readonly fts: {
enabled: boolean;
available: boolean;
loadError?: string;
};
protected vectorReady: Promise<boolean> | null = null;
protected watcher: FSWatcher | null = null;
protected watchTimer: NodeJS.Timeout | null = null;
protected sessionWatchTimer: NodeJS.Timeout | null = null;
protected sessionUnsubscribe: (() => void) | null = null;
protected intervalTimer: NodeJS.Timeout | null = null;
protected closed = false;
protected dirty = false;
protected sessionsDirty = false;
protected sessionsDirtyFiles = new Set<string>();
protected sessionPendingFiles = new Set<string>();
protected sessionDeltas = new Map<
string,
{ lastSize: number; pendingBytes: number; pendingMessages: number }
>();
private sessionWarm = new Set<string>();
private syncing: Promise<void> | null = null;
static async get(params: {
cfg: OpenClawConfig;
agentId: string;
purpose?: "default" | "status";
}): Promise<MemoryIndexManager | null> {
const { cfg, agentId } = params;
const settings = resolveMemorySearchConfig(cfg, agentId);
if (!settings) {
return null;
}
const workspaceDir = resolveAgentWorkspaceDir(cfg, agentId);
const key = `${agentId}:${workspaceDir}:${JSON.stringify(settings)}`;
const existing = INDEX_CACHE.get(key);
if (existing) {
return existing;
}
const providerResult = await createEmbeddingProvider({
config: cfg,
agentDir: resolveAgentDir(cfg, agentId),
provider: settings.provider,
remote: settings.remote,
model: settings.model,
fallback: settings.fallback,
local: settings.local,
});
const manager = new MemoryIndexManager({
cacheKey: key,
cfg,
agentId,
workspaceDir,
settings,
providerResult,
purpose: params.purpose,
});
INDEX_CACHE.set(key, manager);
return manager;
}
private constructor(params: {
cacheKey: string;
cfg: OpenClawConfig;
agentId: string;
workspaceDir: string;
settings: ResolvedMemorySearchConfig;
providerResult: EmbeddingProviderResult;
purpose?: "default" | "status";
}) {
super();
this.cacheKey = params.cacheKey;
this.cfg = params.cfg;
this.agentId = params.agentId;
this.workspaceDir = params.workspaceDir;
this.settings = params.settings;
this.provider = params.providerResult.provider;
this.requestedProvider = params.providerResult.requestedProvider;
this.fallbackFrom = params.providerResult.fallbackFrom;
this.fallbackReason = params.providerResult.fallbackReason;
this.providerUnavailableReason = params.providerResult.providerUnavailableReason;
this.openAi = params.providerResult.openAi;
this.gemini = params.providerResult.gemini;
this.voyage = params.providerResult.voyage;
this.sources = new Set(params.settings.sources);
this.db = this.openDatabase();
this.providerKey = this.computeProviderKey();
this.cache = {
enabled: params.settings.cache.enabled,
maxEntries: params.settings.cache.maxEntries,
};
this.fts = { enabled: params.settings.query.hybrid.enabled, available: false };
this.ensureSchema();
this.vector = {
enabled: params.settings.store.vector.enabled,
available: null,
extensionPath: params.settings.store.vector.extensionPath,
};
const meta = this.readMeta();
if (meta?.vectorDims) {
this.vector.dims = meta.vectorDims;
}
this.ensureWatcher();
this.ensureSessionListener();
this.ensureIntervalSync();
const statusOnly = params.purpose === "status";
this.dirty = this.sources.has("memory") && (statusOnly ? !meta : true);
this.batch = this.resolveBatchConfig();
}
async warmSession(sessionKey?: string): Promise<void> {
if (!this.settings.sync.onSessionStart) {
return;
}
const key = sessionKey?.trim() || "";
if (key && this.sessionWarm.has(key)) {
return;
}
void this.sync({ reason: "session-start" }).catch((err) => {
log.warn(`memory sync failed (session-start): ${String(err)}`);
});
if (key) {
this.sessionWarm.add(key);
}
}
async search(
query: string,
opts?: {
maxResults?: number;
minScore?: number;
sessionKey?: string;
},
): Promise<MemorySearchResult[]> {
void this.warmSession(opts?.sessionKey);
if (this.settings.sync.onSearch && (this.dirty || this.sessionsDirty)) {
void this.sync({ reason: "search" }).catch((err) => {
log.warn(`memory sync failed (search): ${String(err)}`);
});
}
const cleaned = query.trim();
if (!cleaned) {
return [];
}
const minScore = opts?.minScore ?? this.settings.query.minScore;
const maxResults = opts?.maxResults ?? this.settings.query.maxResults;
const hybrid = this.settings.query.hybrid;
const candidates = Math.min(
200,
Math.max(1, Math.floor(maxResults * hybrid.candidateMultiplier)),
);
// FTS-only mode: no embedding provider available
if (!this.provider) {
if (!this.fts.enabled || !this.fts.available) {
log.warn("memory search: no provider and FTS unavailable");
return [];
}
// Extract keywords for better FTS matching on conversational queries
// e.g., "that thing we discussed about the API" → ["discussed", "API"]
const keywords = extractKeywords(cleaned);
const searchTerms = keywords.length > 0 ? keywords : [cleaned];
// Search with each keyword and merge results
const resultSets = await Promise.all(
searchTerms.map((term) => this.searchKeyword(term, candidates).catch(() => [])),
);
// Merge and deduplicate results, keeping highest score for each chunk
const seenIds = new Map<string, (typeof resultSets)[0][0]>();
for (const results of resultSets) {
for (const result of results) {
const existing = seenIds.get(result.id);
if (!existing || result.score > existing.score) {
seenIds.set(result.id, result);
}
}
}
const merged = [...seenIds.values()]
.toSorted((a, b) => b.score - a.score)
.filter((entry) => entry.score >= minScore)
.slice(0, maxResults);
return merged;
}
const keywordResults = hybrid.enabled
? await this.searchKeyword(cleaned, candidates).catch(() => [])
: [];
const queryVec = await this.embedQueryWithTimeout(cleaned);
const hasVector = queryVec.some((v) => v !== 0);
const vectorResults = hasVector
? await this.searchVector(queryVec, candidates).catch(() => [])
: [];
if (!hybrid.enabled) {
return vectorResults.filter((entry) => entry.score >= minScore).slice(0, maxResults);
}
const merged = await this.mergeHybridResults({
vector: vectorResults,
keyword: keywordResults,
vectorWeight: hybrid.vectorWeight,
textWeight: hybrid.textWeight,
mmr: hybrid.mmr,
temporalDecay: hybrid.temporalDecay,
});
return merged.filter((entry) => entry.score >= minScore).slice(0, maxResults);
}
private async searchVector(
queryVec: number[],
limit: number,
): Promise<Array<MemorySearchResult & { id: string }>> {
// This method should never be called without a provider
if (!this.provider) {
return [];
}
const results = await searchVector({
db: this.db,
vectorTable: VECTOR_TABLE,
providerModel: this.provider.model,
queryVec,
limit,
snippetMaxChars: SNIPPET_MAX_CHARS,
ensureVectorReady: async (dimensions) => await this.ensureVectorReady(dimensions),
sourceFilterVec: this.buildSourceFilter("c"),
sourceFilterChunks: this.buildSourceFilter(),
});
return results.map((entry) => entry as MemorySearchResult & { id: string });
}
private buildFtsQuery(raw: string): string | null {
return buildFtsQuery(raw);
}
private async searchKeyword(
query: string,
limit: number,
): Promise<Array<MemorySearchResult & { id: string; textScore: number }>> {
if (!this.fts.enabled || !this.fts.available) {
return [];
}
const sourceFilter = this.buildSourceFilter();
// In FTS-only mode (no provider), search all models; otherwise filter by current provider's model
const providerModel = this.provider?.model;
const results = await searchKeyword({
db: this.db,
ftsTable: FTS_TABLE,
providerModel,
query,
limit,
snippetMaxChars: SNIPPET_MAX_CHARS,
sourceFilter,
buildFtsQuery: (raw) => this.buildFtsQuery(raw),
bm25RankToScore,
});
return results.map((entry) => entry as MemorySearchResult & { id: string; textScore: number });
}
private mergeHybridResults(params: {
vector: Array<MemorySearchResult & { id: string }>;
keyword: Array<MemorySearchResult & { id: string; textScore: number }>;
vectorWeight: number;
textWeight: number;
mmr?: { enabled: boolean; lambda: number };
temporalDecay?: { enabled: boolean; halfLifeDays: number };
}): Promise<MemorySearchResult[]> {
return mergeHybridResults({
vector: params.vector.map((r) => ({
id: r.id,
path: r.path,
startLine: r.startLine,
endLine: r.endLine,
source: r.source,
snippet: r.snippet,
vectorScore: r.score,
})),
keyword: params.keyword.map((r) => ({
id: r.id,
path: r.path,
startLine: r.startLine,
endLine: r.endLine,
source: r.source,
snippet: r.snippet,
textScore: r.textScore,
})),
vectorWeight: params.vectorWeight,
textWeight: params.textWeight,
mmr: params.mmr,
temporalDecay: params.temporalDecay,
workspaceDir: this.workspaceDir,
}).then((entries) => entries.map((entry) => entry as MemorySearchResult));
}
async sync(params?: {
reason?: string;
force?: boolean;
progress?: (update: MemorySyncProgressUpdate) => void;
}): Promise<void> {
if (this.syncing) {
return this.syncing;
}
this.syncing = this.runSync(params).finally(() => {
this.syncing = null;
});
return this.syncing ?? Promise.resolve();
}
async readFile(params: {
relPath: string;
from?: number;
lines?: number;
}): Promise<{ text: string; path: string }> {
const rawPath = params.relPath.trim();
if (!rawPath) {
throw new Error("path required");
}
const absPath = path.isAbsolute(rawPath)
? path.resolve(rawPath)
: path.resolve(this.workspaceDir, rawPath);
const relPath = path.relative(this.workspaceDir, absPath).replace(/\\/g, "/");
const inWorkspace =
relPath.length > 0 && !relPath.startsWith("..") && !path.isAbsolute(relPath);
const allowedWorkspace = inWorkspace && isMemoryPath(relPath);
let allowedAdditional = false;
if (!allowedWorkspace && this.settings.extraPaths.length > 0) {
const additionalPaths = normalizeExtraMemoryPaths(
this.workspaceDir,
this.settings.extraPaths,
);
for (const additionalPath of additionalPaths) {
try {
const stat = await fs.lstat(additionalPath);
if (stat.isSymbolicLink()) {
continue;
}
if (stat.isDirectory()) {
if (absPath === additionalPath || absPath.startsWith(`${additionalPath}${path.sep}`)) {
allowedAdditional = true;
break;
}
continue;
}
if (stat.isFile()) {
if (absPath === additionalPath && absPath.endsWith(".md")) {
allowedAdditional = true;
break;
}
}
} catch {}
}
}
if (!allowedWorkspace && !allowedAdditional) {
throw new Error("path required");
}
if (!absPath.endsWith(".md")) {
throw new Error("path required");
}
const stat = await fs.lstat(absPath);
if (stat.isSymbolicLink() || !stat.isFile()) {
throw new Error("path required");
}
const content = await fs.readFile(absPath, "utf-8");
if (!params.from && !params.lines) {
return { text: content, path: relPath };
}
const lines = content.split("\n");
const start = Math.max(1, params.from ?? 1);
const count = Math.max(1, params.lines ?? lines.length);
const slice = lines.slice(start - 1, start - 1 + count);
return { text: slice.join("\n"), path: relPath };
}
status(): MemoryProviderStatus {
const sourceFilter = this.buildSourceFilter();
const files = this.db
.prepare(`SELECT COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql}`)
.get(...sourceFilter.params) as {
c: number;
};
const chunks = this.db
.prepare(`SELECT COUNT(*) as c FROM chunks WHERE 1=1${sourceFilter.sql}`)
.get(...sourceFilter.params) as {
c: number;
};
const sourceCounts = (() => {
const sources = Array.from(this.sources);
if (sources.length === 0) {
return [];
}
const bySource = new Map<MemorySource, { files: number; chunks: number }>();
for (const source of sources) {
bySource.set(source, { files: 0, chunks: 0 });
}
const fileRows = this.db
.prepare(
`SELECT source, COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql} GROUP BY source`,
)
.all(...sourceFilter.params) as Array<{ source: MemorySource; c: number }>;
for (const row of fileRows) {
const entry = bySource.get(row.source) ?? { files: 0, chunks: 0 };
entry.files = row.c ?? 0;
bySource.set(row.source, entry);
}
const chunkRows = this.db
.prepare(
`SELECT source, COUNT(*) as c FROM chunks WHERE 1=1${sourceFilter.sql} GROUP BY source`,
)
.all(...sourceFilter.params) as Array<{ source: MemorySource; c: number }>;
for (const row of chunkRows) {
const entry = bySource.get(row.source) ?? { files: 0, chunks: 0 };
entry.chunks = row.c ?? 0;
bySource.set(row.source, entry);
}
return sources.map((source) => Object.assign({ source }, bySource.get(source)!));
})();
// Determine search mode: "fts-only" if no provider, "hybrid" otherwise
const searchMode = this.provider ? "hybrid" : "fts-only";
const providerInfo = this.provider
? { provider: this.provider.id, model: this.provider.model }
: { provider: "none", model: undefined };
return {
backend: "builtin",
files: files?.c ?? 0,
chunks: chunks?.c ?? 0,
dirty: this.dirty || this.sessionsDirty,
workspaceDir: this.workspaceDir,
dbPath: this.settings.store.path,
provider: providerInfo.provider,
model: providerInfo.model,
requestedProvider: this.requestedProvider,
sources: Array.from(this.sources),
extraPaths: this.settings.extraPaths,
sourceCounts,
cache: this.cache.enabled
? {
enabled: true,
entries:
(
this.db.prepare(`SELECT COUNT(*) as c FROM ${EMBEDDING_CACHE_TABLE}`).get() as
| { c: number }
| undefined
)?.c ?? 0,
maxEntries: this.cache.maxEntries,
}
: { enabled: false, maxEntries: this.cache.maxEntries },
fts: {
enabled: this.fts.enabled,
available: this.fts.available,
error: this.fts.loadError,
},
fallback: this.fallbackReason
? { from: this.fallbackFrom ?? "local", reason: this.fallbackReason }
: undefined,
vector: {
enabled: this.vector.enabled,
available: this.vector.available ?? undefined,
extensionPath: this.vector.extensionPath,
loadError: this.vector.loadError,
dims: this.vector.dims,
},
batch: {
enabled: this.batch.enabled,
failures: this.batchFailureCount,
limit: BATCH_FAILURE_LIMIT,
wait: this.batch.wait,
concurrency: this.batch.concurrency,
pollIntervalMs: this.batch.pollIntervalMs,
timeoutMs: this.batch.timeoutMs,
lastError: this.batchFailureLastError,
lastProvider: this.batchFailureLastProvider,
},
custom: {
searchMode,
providerUnavailableReason: this.providerUnavailableReason,
},
};
}
async probeVectorAvailability(): Promise<boolean> {
// FTS-only mode: vector search not available
if (!this.provider) {
return false;
}
if (!this.vector.enabled) {
return false;
}
return this.ensureVectorReady();
}
async probeEmbeddingAvailability(): Promise<MemoryEmbeddingProbeResult> {
// FTS-only mode: embeddings not available but search still works
if (!this.provider) {
return {
ok: false,
error: this.providerUnavailableReason ?? "No embedding provider available (FTS-only mode)",
};
}
try {
await this.embedBatchWithRetry(["ping"]);
return { ok: true };
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
return { ok: false, error: message };
}
}
async close(): Promise<void> {
if (this.closed) {
return;
}
this.closed = true;
if (this.watchTimer) {
clearTimeout(this.watchTimer);
this.watchTimer = null;
}
if (this.sessionWatchTimer) {
clearTimeout(this.sessionWatchTimer);
this.sessionWatchTimer = null;
}
if (this.intervalTimer) {
clearInterval(this.intervalTimer);
this.intervalTimer = null;
}
if (this.watcher) {
await this.watcher.close();
this.watcher = null;
}
if (this.sessionUnsubscribe) {
this.sessionUnsubscribe();
this.sessionUnsubscribe = null;
}
this.db.close();
INDEX_CACHE.delete(this.cacheKey);
}
}