Files
sales-data-analysis/data_quality.py
Jonathan Pressnell cf0b596449 Initial commit: sales analysis template
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-06 09:16:34 -05:00

345 lines
12 KiB
Python

"""
Data quality reporting utility
Generates comprehensive data quality reports
Usage:
from data_quality import generate_data_quality_report, print_data_quality_report
# Generate and print report
report = generate_data_quality_report(df)
print_data_quality_report(report)
"""
import pandas as pd
import numpy as np
from config import (
REVENUE_COLUMN, DATE_COLUMN, CUSTOMER_COLUMN, ITEM_COLUMN,
QUANTITY_COLUMN, MIN_QUANTITY, MAX_QUANTITY
)
def generate_data_quality_report(df):
"""
Generate comprehensive data quality report
Args:
df: DataFrame to analyze
Returns:
dict: Dictionary containing data quality metrics
"""
report = {
'overview': {},
'missing_values': {},
'duplicates': {},
'outliers': {},
'data_types': {},
'date_coverage': {},
'revenue_summary': {},
'issues': []
}
# Overview
report['overview'] = {
'total_rows': len(df),
'total_columns': len(df.columns),
'memory_usage_mb': df.memory_usage(deep=True).sum() / 1024**2
}
# Missing values
missing = df.isnull().sum()
missing_pct = (missing / len(df)) * 100
report['missing_values'] = {
'by_column': missing[missing > 0].to_dict(),
'percentages': missing_pct[missing > 0].to_dict(),
'total_missing': missing.sum(),
'columns_with_missing': len(missing[missing > 0])
}
# Duplicates
duplicate_rows = df.duplicated().sum()
report['duplicates'] = {
'duplicate_rows': int(duplicate_rows),
'duplicate_percentage': (duplicate_rows / len(df)) * 100 if len(df) > 0 else 0
}
# Outliers (revenue and quantity)
outliers = {}
if REVENUE_COLUMN in df.columns:
revenue = pd.to_numeric(df[REVENUE_COLUMN], errors='coerce')
q1 = revenue.quantile(0.25)
q3 = revenue.quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
revenue_outliers = ((revenue < lower_bound) | (revenue > upper_bound)).sum()
outliers['revenue'] = {
'count': int(revenue_outliers),
'percentage': (revenue_outliers / len(df)) * 100 if len(df) > 0 else 0,
'lower_bound': float(lower_bound),
'upper_bound': float(upper_bound),
'negative_values': int((revenue < 0).sum())
}
if QUANTITY_COLUMN in df.columns:
quantity = pd.to_numeric(df[QUANTITY_COLUMN], errors='coerce')
# Use config thresholds if available
if MIN_QUANTITY is not None and MAX_QUANTITY is not None:
quantity_outliers = ((quantity < MIN_QUANTITY) | (quantity > MAX_QUANTITY)).sum()
outliers['quantity'] = {
'count': int(quantity_outliers),
'percentage': (quantity_outliers / len(df)) * 100 if len(df) > 0 else 0,
'below_min': int((quantity < MIN_QUANTITY).sum()),
'above_max': int((quantity > MAX_QUANTITY).sum())
}
else:
q1 = quantity.quantile(0.25)
q3 = quantity.quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
quantity_outliers = ((quantity < lower_bound) | (quantity > upper_bound)).sum()
outliers['quantity'] = {
'count': int(quantity_outliers),
'percentage': (quantity_outliers / len(df)) * 100 if len(df) > 0 else 0,
'lower_bound': float(lower_bound),
'upper_bound': float(upper_bound)
}
report['outliers'] = outliers
# Data types
report['data_types'] = {
'numeric_columns': list(df.select_dtypes(include=[np.number]).columns),
'datetime_columns': list(df.select_dtypes(include=['datetime64']).columns),
'object_columns': list(df.select_dtypes(include=['object']).columns),
'type_summary': df.dtypes.value_counts().to_dict()
}
# Date coverage
if DATE_COLUMN in df.columns:
date_coverage = df[DATE_COLUMN].notna().sum()
report['date_coverage'] = {
'total_rows': len(df),
'rows_with_dates': int(date_coverage),
'coverage_percentage': (date_coverage / len(df)) * 100 if len(df) > 0 else 0,
'min_date': str(df[DATE_COLUMN].min()) if date_coverage > 0 else None,
'max_date': str(df[DATE_COLUMN].max()) if date_coverage > 0 else None
}
# Revenue summary
if REVENUE_COLUMN in df.columns:
revenue = pd.to_numeric(df[REVENUE_COLUMN], errors='coerce')
valid_revenue = revenue.dropna()
if len(valid_revenue) > 0:
report['revenue_summary'] = {
'total_revenue': float(valid_revenue.sum()),
'mean_revenue': float(valid_revenue.mean()),
'median_revenue': float(valid_revenue.median()),
'min_revenue': float(valid_revenue.min()),
'max_revenue': float(valid_revenue.max()),
'std_revenue': float(valid_revenue.std()),
'valid_rows': int(len(valid_revenue)),
'invalid_rows': int(len(df) - len(valid_revenue))
}
# Identify issues
issues = []
# Critical issues
if report['missing_values']['columns_with_missing'] > 0:
high_missing = {k: v for k, v in report['missing_values']['percentages'].items() if v > 50}
if high_missing:
issues.append({
'severity': 'critical',
'issue': f"Columns with >50% missing values: {list(high_missing.keys())}",
'impact': 'High'
})
if DATE_COLUMN in df.columns:
if report['date_coverage']['coverage_percentage'] < 50:
issues.append({
'severity': 'critical',
'issue': f"Date coverage is only {report['date_coverage']['coverage_percentage']:.1f}%",
'impact': 'High - analyses may fail'
})
if REVENUE_COLUMN in df.columns:
if report['revenue_summary'].get('invalid_rows', 0) > len(df) * 0.1:
issues.append({
'severity': 'critical',
'issue': f"{report['revenue_summary']['invalid_rows']} rows have invalid revenue values",
'impact': 'High'
})
# Warnings
if report['duplicates']['duplicate_percentage'] > 5:
issues.append({
'severity': 'warning',
'issue': f"{report['duplicates']['duplicate_rows']} duplicate rows ({report['duplicates']['duplicate_percentage']:.1f}%)",
'impact': 'Medium'
})
if 'revenue' in outliers:
if outliers['revenue']['percentage'] > 10:
issues.append({
'severity': 'warning',
'issue': f"{outliers['revenue']['count']} revenue outliers ({outliers['revenue']['percentage']:.1f}%)",
'impact': 'Medium'
})
report['issues'] = issues
return report
def print_data_quality_report(report):
"""
Print formatted data quality report
Args:
report: Dictionary from generate_data_quality_report()
"""
print("\n" + "="*70)
print("DATA QUALITY REPORT")
print("="*70)
# Overview
print("\n📊 OVERVIEW")
print("-" * 70)
print(f"Total Rows: {report['overview']['total_rows']:,}")
print(f"Total Columns: {report['overview']['total_columns']}")
print(f"Memory Usage: {report['overview']['memory_usage_mb']:.2f} MB")
# Missing values
print("\n🔍 MISSING VALUES")
print("-" * 70)
if report['missing_values']['columns_with_missing'] > 0:
print(f"Columns with missing values: {report['missing_values']['columns_with_missing']}")
print(f"Total missing values: {report['missing_values']['total_missing']:,}")
print("\nTop columns by missing values:")
missing_sorted = sorted(
report['missing_values']['percentages'].items(),
key=lambda x: x[1],
reverse=True
)[:10]
for col, pct in missing_sorted:
count = report['missing_values']['by_column'][col]
print(f" {col:30s}: {count:8,} ({pct:5.1f}%)")
else:
print("✅ No missing values found")
# Duplicates
print("\n🔄 DUPLICATES")
print("-" * 70)
if report['duplicates']['duplicate_rows'] > 0:
print(f"Duplicate Rows: {report['duplicates']['duplicate_rows']:,} ({report['duplicates']['duplicate_percentage']:.2f}%)")
else:
print("✅ No duplicate rows found")
# Outliers
print("\n📈 OUTLIERS")
print("-" * 70)
if 'revenue' in report['outliers']:
rev_out = report['outliers']['revenue']
print(f"Revenue Outliers: {rev_out['count']:,} ({rev_out['percentage']:.2f}%)")
if 'negative_values' in rev_out and rev_out['negative_values'] > 0:
print(f" Negative Revenue Values: {rev_out['negative_values']:,}")
if 'quantity' in report['outliers']:
qty_out = report['outliers']['quantity']
print(f"Quantity Outliers: {qty_out['count']:,} ({qty_out['percentage']:.2f}%)")
if not report['outliers']:
print("✅ No significant outliers detected")
# Date coverage
if report['date_coverage']:
print("\n📅 DATE COVERAGE")
print("-" * 70)
dc = report['date_coverage']
print(f"Rows with Dates: {dc['rows_with_dates']:,} / {dc['total_rows']:,} ({dc['coverage_percentage']:.1f}%)")
if dc['min_date']:
print(f"Date Range: {dc['min_date']} to {dc['max_date']}")
# Revenue summary
if report['revenue_summary']:
print("\n💰 REVENUE SUMMARY")
print("-" * 70)
rs = report['revenue_summary']
print(f"Total Revenue: ${rs['total_revenue'] / 1e6:.2f}m")
print(f"Valid Rows: {rs['valid_rows']:,} / {rs['valid_rows'] + rs['invalid_rows']:,}")
if rs['invalid_rows'] > 0:
print(f"Invalid Rows: {rs['invalid_rows']:,}")
print(f"Mean: ${rs['mean_revenue']:,.2f}")
print(f"Median: ${rs['median_revenue']:,.2f}")
print(f"Min: ${rs['min_revenue']:,.2f}")
print(f"Max: ${rs['max_revenue']:,.2f}")
# Issues
if report['issues']:
print("\n⚠️ ISSUES DETECTED")
print("-" * 70)
critical = [i for i in report['issues'] if i['severity'] == 'critical']
warnings = [i for i in report['issues'] if i['severity'] == 'warning']
if critical:
print("❌ CRITICAL ISSUES:")
for issue in critical:
print(f"{issue['issue']}")
print(f" Impact: {issue['impact']}")
if warnings:
print("\n⚠️ WARNINGS:")
for issue in warnings:
print(f"{issue['issue']}")
print(f" Impact: {issue['impact']}")
else:
print("\n✅ NO ISSUES DETECTED")
print("\n" + "="*70)
def generate_data_quality_report_simple(df):
"""
Generate a simple data quality summary (quick check)
Args:
df: DataFrame to analyze
Returns:
str: Simple summary string
"""
summary_parts = []
summary_parts.append(f"Rows: {len(df):,}")
summary_parts.append(f"Columns: {len(df.columns)}")
if REVENUE_COLUMN in df.columns:
revenue = pd.to_numeric(df[REVENUE_COLUMN], errors='coerce')
valid = revenue.notna().sum()
summary_parts.append(f"Valid Revenue: {valid:,} ({valid/len(df)*100:.1f}%)")
if DATE_COLUMN in df.columns:
date_coverage = df[DATE_COLUMN].notna().sum()
summary_parts.append(f"Date Coverage: {date_coverage:,} ({date_coverage/len(df)*100:.1f}%)")
return " | ".join(summary_parts)
# ============================================================================
# STANDALONE DATA QUALITY CHECK
# ============================================================================
if __name__ == "__main__":
"""Run data quality check"""
from data_loader import load_sales_data
from config import get_data_path
print("Loading data for quality check...")
try:
df = load_sales_data(get_data_path())
report = generate_data_quality_report(df)
print_data_quality_report(report)
except Exception as e:
print(f"ERROR: {e}")