The document_processing_events table was never populated. Analytics endpoints now query the documents table directly using status and timestamp columns. Also updated upload page labels to remove outdated "Agentic RAG" references. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
335 lines
11 KiB
TypeScript
335 lines
11 KiB
TypeScript
import { getSupabaseServiceClient } from '../config/supabase';
|
|
import { getPostgresPool } from '../config/supabase';
|
|
import { logger } from '../utils/logger';
|
|
|
|
// =============================================================================
|
|
// Types
|
|
// =============================================================================
|
|
|
|
export interface ProcessingEventData {
|
|
document_id: string;
|
|
user_id: string;
|
|
event_type: 'upload_started' | 'processing_started' | 'completed' | 'failed';
|
|
duration_ms?: number;
|
|
error_message?: string;
|
|
stage?: string;
|
|
}
|
|
|
|
// =============================================================================
|
|
// recordProcessingEvent
|
|
// =============================================================================
|
|
|
|
/**
|
|
* Fire-and-forget analytics write for document processing lifecycle events.
|
|
*
|
|
* Return type is void (NOT Promise<void>) to prevent accidental await on the
|
|
* critical processing path. Any Supabase write failure is logged but never
|
|
* thrown — analytics must never block or break document processing.
|
|
*
|
|
* Architecture decision: Analytics writes are always fire-and-forget.
|
|
* See STATE.md: "Analytics writes are always fire-and-forget (never await on critical path)"
|
|
*/
|
|
export function recordProcessingEvent(data: ProcessingEventData): void {
|
|
const supabase = getSupabaseServiceClient();
|
|
|
|
void supabase
|
|
.from('document_processing_events')
|
|
.insert({
|
|
document_id: data.document_id,
|
|
user_id: data.user_id,
|
|
event_type: data.event_type,
|
|
duration_ms: data.duration_ms ?? null,
|
|
error_message: data.error_message ?? null,
|
|
stage: data.stage ?? null,
|
|
created_at: new Date().toISOString(),
|
|
})
|
|
.then(({ error }) => {
|
|
if (error) {
|
|
logger.error('analyticsService: failed to insert processing event', {
|
|
error: error.message,
|
|
document_id: data.document_id,
|
|
event_type: data.event_type,
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
// =============================================================================
|
|
// deleteProcessingEventsOlderThan
|
|
// =============================================================================
|
|
|
|
/**
|
|
* Delete document_processing_events rows older than `days` days.
|
|
*
|
|
* Used by the retention cleanup job to enforce data retention policy.
|
|
* Returns the count of rows deleted.
|
|
*
|
|
* Follows the same pattern as HealthCheckModel.deleteOlderThan().
|
|
*/
|
|
export async function deleteProcessingEventsOlderThan(days: number): Promise<number> {
|
|
const cutoff = new Date(Date.now() - days * 86400000).toISOString();
|
|
const supabase = getSupabaseServiceClient();
|
|
|
|
const { data, error } = await supabase
|
|
.from('document_processing_events')
|
|
.delete()
|
|
.lt('created_at', cutoff)
|
|
.select();
|
|
|
|
if (error) {
|
|
logger.error('analyticsService: failed to delete old processing events', {
|
|
error: error.message,
|
|
days,
|
|
cutoff,
|
|
});
|
|
throw new Error(`failed to delete processing events older than ${days} days — ${error.message}`);
|
|
}
|
|
|
|
return data ? data.length : 0;
|
|
}
|
|
|
|
// =============================================================================
|
|
// AnalyticsSummary — aggregate query for admin API
|
|
// =============================================================================
|
|
|
|
export interface AnalyticsSummary {
|
|
range: string;
|
|
totalUploads: number;
|
|
succeeded: number;
|
|
failed: number;
|
|
successRate: number;
|
|
avgProcessingMs: number | null;
|
|
generatedAt: string;
|
|
}
|
|
|
|
function parseRange(range: string): string {
|
|
if (/^\d+h$/.test(range)) return range.replace('h', ' hours');
|
|
if (/^\d+d$/.test(range)) return range.replace('d', ' days');
|
|
return '24 hours'; // fallback default
|
|
}
|
|
|
|
/**
|
|
* Returns a processing summary aggregate for the given time range.
|
|
* Uses getPostgresPool() for aggregate SQL — Supabase JS client does not support COUNT/AVG.
|
|
*/
|
|
export async function getAnalyticsSummary(range: string = '24h'): Promise<AnalyticsSummary> {
|
|
const interval = parseRange(range);
|
|
const pool = getPostgresPool();
|
|
|
|
const { rows } = await pool.query<{
|
|
total_uploads: string;
|
|
succeeded: string;
|
|
failed: string;
|
|
avg_processing_ms: string | null;
|
|
}>(`
|
|
SELECT
|
|
COUNT(*) FILTER (WHERE event_type = 'upload_started') AS total_uploads,
|
|
COUNT(*) FILTER (WHERE event_type = 'completed') AS succeeded,
|
|
COUNT(*) FILTER (WHERE event_type = 'failed') AS failed,
|
|
AVG(duration_ms) FILTER (WHERE event_type = 'completed') AS avg_processing_ms
|
|
FROM document_processing_events
|
|
WHERE created_at >= NOW() - $1::interval
|
|
`, [interval]);
|
|
|
|
const row = rows[0]!;
|
|
const total = parseInt(row.total_uploads, 10);
|
|
const succeeded = parseInt(row.succeeded, 10);
|
|
const failed = parseInt(row.failed, 10);
|
|
|
|
return {
|
|
range,
|
|
totalUploads: total,
|
|
succeeded,
|
|
failed,
|
|
successRate: total > 0 ? succeeded / total : 0,
|
|
avgProcessingMs: row.avg_processing_ms ? parseFloat(row.avg_processing_ms) : null,
|
|
generatedAt: new Date().toISOString(),
|
|
};
|
|
}
|
|
|
|
// =============================================================================
|
|
// getSessionAnalytics — per-day session stats for Analytics tab
|
|
// =============================================================================
|
|
|
|
export interface SessionAnalytics {
|
|
sessionStats: Array<{
|
|
date: string;
|
|
total_sessions: string;
|
|
successful_sessions: string;
|
|
failed_sessions: string;
|
|
avg_processing_time: string;
|
|
avg_cost: string;
|
|
}>;
|
|
agentStats: Array<Record<string, string>>;
|
|
qualityStats: Array<Record<string, string>>;
|
|
period: {
|
|
startDate: string;
|
|
endDate: string;
|
|
days: number;
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Returns per-day session statistics from the documents table.
|
|
* Groups documents by creation date, counts by status, and derives processing time
|
|
* from (updated_at - created_at) for completed documents.
|
|
*/
|
|
export async function getSessionAnalytics(days: number): Promise<SessionAnalytics> {
|
|
const pool = getPostgresPool();
|
|
const interval = `${days} days`;
|
|
|
|
const { rows } = await pool.query<{
|
|
date: string;
|
|
total_sessions: string;
|
|
successful_sessions: string;
|
|
failed_sessions: string;
|
|
avg_processing_time: string;
|
|
}>(`
|
|
SELECT
|
|
DATE(created_at) AS date,
|
|
COUNT(*) AS total_sessions,
|
|
COUNT(*) FILTER (WHERE status = 'completed') AS successful_sessions,
|
|
COUNT(*) FILTER (WHERE status = 'failed') AS failed_sessions,
|
|
COALESCE(
|
|
AVG(EXTRACT(EPOCH FROM (updated_at - created_at)) * 1000)
|
|
FILTER (WHERE status = 'completed'), 0
|
|
) AS avg_processing_time
|
|
FROM documents
|
|
WHERE created_at >= NOW() - $1::interval
|
|
GROUP BY DATE(created_at)
|
|
ORDER BY date DESC
|
|
`, [interval]);
|
|
|
|
const endDate = new Date();
|
|
const startDate = new Date(Date.now() - days * 86400000);
|
|
|
|
return {
|
|
sessionStats: rows.map(r => ({
|
|
date: r.date,
|
|
total_sessions: r.total_sessions,
|
|
successful_sessions: r.successful_sessions,
|
|
failed_sessions: r.failed_sessions,
|
|
avg_processing_time: r.avg_processing_time,
|
|
avg_cost: '0', // cost tracking not implemented
|
|
})),
|
|
agentStats: [], // agent-level tracking not available in current schema
|
|
qualityStats: [], // quality scores not available in current schema
|
|
period: {
|
|
startDate: startDate.toISOString(),
|
|
endDate: endDate.toISOString(),
|
|
days,
|
|
},
|
|
};
|
|
}
|
|
|
|
// =============================================================================
|
|
// getProcessingStatsFromEvents — processing pipeline stats for Analytics tab
|
|
// =============================================================================
|
|
|
|
export interface ProcessingStatsFromEvents {
|
|
totalDocuments: number;
|
|
documentAiAgenticRagSuccess: number;
|
|
averageProcessingTime: { documentAiAgenticRag: number };
|
|
averageApiCalls: { documentAiAgenticRag: number };
|
|
}
|
|
|
|
/**
|
|
* Returns processing pipeline statistics from the documents table.
|
|
*/
|
|
export async function getProcessingStatsFromEvents(): Promise<ProcessingStatsFromEvents> {
|
|
const pool = getPostgresPool();
|
|
|
|
const { rows } = await pool.query<{
|
|
total_documents: string;
|
|
succeeded: string;
|
|
avg_processing_ms: string | null;
|
|
}>(`
|
|
SELECT
|
|
COUNT(*) AS total_documents,
|
|
COUNT(*) FILTER (WHERE status = 'completed') AS succeeded,
|
|
AVG(EXTRACT(EPOCH FROM (updated_at - created_at)) * 1000)
|
|
FILTER (WHERE status = 'completed') AS avg_processing_ms
|
|
FROM documents
|
|
`);
|
|
|
|
const row = rows[0]!;
|
|
|
|
return {
|
|
totalDocuments: parseInt(row.total_documents, 10),
|
|
documentAiAgenticRagSuccess: parseInt(row.succeeded, 10),
|
|
averageProcessingTime: {
|
|
documentAiAgenticRag: row.avg_processing_ms ? parseFloat(row.avg_processing_ms) : 0,
|
|
},
|
|
averageApiCalls: {
|
|
documentAiAgenticRag: 0,
|
|
},
|
|
};
|
|
}
|
|
|
|
// =============================================================================
|
|
// getHealthFromEvents — system health status for Analytics tab
|
|
// =============================================================================
|
|
|
|
export interface HealthFromEvents {
|
|
status: 'healthy' | 'degraded' | 'unhealthy';
|
|
agents: Record<string, unknown>;
|
|
overall: {
|
|
successRate: number;
|
|
averageProcessingTime: number;
|
|
activeSessions: number;
|
|
errorRate: number;
|
|
};
|
|
timestamp: string;
|
|
}
|
|
|
|
/**
|
|
* Derives system health status from recent documents.
|
|
* Looks at the last 24 hours to determine health.
|
|
*/
|
|
export async function getHealthFromEvents(): Promise<HealthFromEvents> {
|
|
const pool = getPostgresPool();
|
|
|
|
const { rows } = await pool.query<{
|
|
total: string;
|
|
succeeded: string;
|
|
failed: string;
|
|
avg_processing_ms: string | null;
|
|
active: string;
|
|
}>(`
|
|
SELECT
|
|
COUNT(*) AS total,
|
|
COUNT(*) FILTER (WHERE status = 'completed') AS succeeded,
|
|
COUNT(*) FILTER (WHERE status = 'failed') AS failed,
|
|
AVG(EXTRACT(EPOCH FROM (updated_at - created_at)) * 1000)
|
|
FILTER (WHERE status = 'completed') AS avg_processing_ms,
|
|
COUNT(*) FILTER (
|
|
WHERE status NOT IN ('completed', 'failed')
|
|
) AS active
|
|
FROM documents
|
|
WHERE created_at >= NOW() - INTERVAL '24 hours'
|
|
`);
|
|
|
|
const row = rows[0]!;
|
|
const total = parseInt(row.total, 10);
|
|
const succeeded = parseInt(row.succeeded, 10);
|
|
const failed = parseInt(row.failed, 10);
|
|
const successRate = total > 0 ? succeeded / total : 1.0;
|
|
const errorRate = total > 0 ? failed / total : 0;
|
|
|
|
let status: 'healthy' | 'degraded' | 'unhealthy' = 'healthy';
|
|
if (errorRate > 0.5) status = 'unhealthy';
|
|
else if (errorRate > 0.2) status = 'degraded';
|
|
|
|
return {
|
|
status,
|
|
agents: {},
|
|
overall: {
|
|
successRate,
|
|
averageProcessingTime: row.avg_processing_ms ? parseFloat(row.avg_processing_ms) : 0,
|
|
activeSessions: parseInt(row.active, 10),
|
|
errorRate,
|
|
},
|
|
timestamp: new Date().toISOString(),
|
|
};
|
|
}
|