refactor(infra): share jsonl transcript reader

This commit is contained in:
Peter Steinberger
2026-02-15 21:53:12 +00:00
parent c92bcf24c4
commit 8cd20e220f

View File

@@ -212,39 +212,52 @@ const applyCostTotal = (totals: CostUsageTotals, costTotal: number | undefined)
totals.totalCost += costTotal;
};
async function* readJsonlRecords(filePath: string): AsyncGenerator<Record<string, unknown>> {
const fileStream = fs.createReadStream(filePath, { encoding: "utf-8" });
const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity });
try {
for await (const line of rl) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
try {
const parsed = JSON.parse(trimmed) as unknown;
if (!parsed || typeof parsed !== "object") {
continue;
}
yield parsed as Record<string, unknown>;
} catch {
// Ignore malformed lines
}
}
} finally {
rl.close();
fileStream.destroy();
}
}
async function scanTranscriptFile(params: {
filePath: string;
config?: OpenClawConfig;
onEntry: (entry: ParsedTranscriptEntry) => void;
}): Promise<void> {
const fileStream = fs.createReadStream(params.filePath, { encoding: "utf-8" });
const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity });
for await (const line of rl) {
const trimmed = line.trim();
if (!trimmed) {
for await (const parsed of readJsonlRecords(params.filePath)) {
const entry = parseTranscriptEntry(parsed);
if (!entry) {
continue;
}
try {
const parsed = JSON.parse(trimmed) as Record<string, unknown>;
const entry = parseTranscriptEntry(parsed);
if (!entry) {
continue;
}
if (entry.usage && entry.costTotal === undefined) {
const cost = resolveModelCostConfig({
provider: entry.provider,
model: entry.model,
config: params.config,
});
entry.costTotal = estimateUsageCost({ usage: entry.usage, cost });
}
params.onEntry(entry);
} catch {
// Ignore malformed lines
if (entry.usage && entry.costTotal === undefined) {
const cost = resolveModelCostConfig({
provider: entry.provider,
model: entry.model,
config: params.config,
});
entry.costTotal = estimateUsageCost({ usage: entry.usage, cost });
}
params.onEntry(entry);
}
}
@@ -400,16 +413,8 @@ export async function discoverAllSessions(params?: {
// Try to read first user message for label extraction
let firstUserMessage: string | undefined;
try {
const fileStream = fs.createReadStream(filePath, { encoding: "utf-8" });
const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity });
for await (const line of rl) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
for await (const parsed of readJsonlRecords(filePath)) {
try {
const parsed = JSON.parse(trimmed) as Record<string, unknown>;
const message = parsed.message as Record<string, unknown> | undefined;
if (message?.role === "user") {
const content = message.content;
@@ -436,8 +441,6 @@ export async function discoverAllSessions(params?: {
// Skip malformed lines
}
}
rl.close();
fileStream.destroy();
} catch {
// Ignore read errors
}
@@ -831,16 +834,8 @@ export async function loadSessionLogs(params: {
const logs: SessionLogEntry[] = [];
const limit = params.limit ?? 50;
const fileStream = fs.createReadStream(sessionFile, { encoding: "utf-8" });
const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity });
for await (const line of rl) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
for await (const parsed of readJsonlRecords(sessionFile)) {
try {
const parsed = JSON.parse(trimmed) as Record<string, unknown>;
const message = parsed.message as Record<string, unknown> | undefined;
if (!message) {
continue;