345 lines
12 KiB
Python
345 lines
12 KiB
Python
"""
|
|
Data quality reporting utility
|
|
Generates comprehensive data quality reports
|
|
|
|
Usage:
|
|
from data_quality import generate_data_quality_report, print_data_quality_report
|
|
|
|
# Generate and print report
|
|
report = generate_data_quality_report(df)
|
|
print_data_quality_report(report)
|
|
"""
|
|
import pandas as pd
|
|
import numpy as np
|
|
from config import (
|
|
REVENUE_COLUMN, DATE_COLUMN, CUSTOMER_COLUMN, ITEM_COLUMN,
|
|
QUANTITY_COLUMN, MIN_QUANTITY, MAX_QUANTITY
|
|
)
|
|
|
|
def generate_data_quality_report(df):
|
|
"""
|
|
Generate comprehensive data quality report
|
|
|
|
Args:
|
|
df: DataFrame to analyze
|
|
|
|
Returns:
|
|
dict: Dictionary containing data quality metrics
|
|
"""
|
|
report = {
|
|
'overview': {},
|
|
'missing_values': {},
|
|
'duplicates': {},
|
|
'outliers': {},
|
|
'data_types': {},
|
|
'date_coverage': {},
|
|
'revenue_summary': {},
|
|
'issues': []
|
|
}
|
|
|
|
# Overview
|
|
report['overview'] = {
|
|
'total_rows': len(df),
|
|
'total_columns': len(df.columns),
|
|
'memory_usage_mb': df.memory_usage(deep=True).sum() / 1024**2
|
|
}
|
|
|
|
# Missing values
|
|
missing = df.isnull().sum()
|
|
missing_pct = (missing / len(df)) * 100
|
|
report['missing_values'] = {
|
|
'by_column': missing[missing > 0].to_dict(),
|
|
'percentages': missing_pct[missing > 0].to_dict(),
|
|
'total_missing': missing.sum(),
|
|
'columns_with_missing': len(missing[missing > 0])
|
|
}
|
|
|
|
# Duplicates
|
|
duplicate_rows = df.duplicated().sum()
|
|
report['duplicates'] = {
|
|
'duplicate_rows': int(duplicate_rows),
|
|
'duplicate_percentage': (duplicate_rows / len(df)) * 100 if len(df) > 0 else 0
|
|
}
|
|
|
|
# Outliers (revenue and quantity)
|
|
outliers = {}
|
|
|
|
if REVENUE_COLUMN in df.columns:
|
|
revenue = pd.to_numeric(df[REVENUE_COLUMN], errors='coerce')
|
|
q1 = revenue.quantile(0.25)
|
|
q3 = revenue.quantile(0.75)
|
|
iqr = q3 - q1
|
|
lower_bound = q1 - 1.5 * iqr
|
|
upper_bound = q3 + 1.5 * iqr
|
|
|
|
revenue_outliers = ((revenue < lower_bound) | (revenue > upper_bound)).sum()
|
|
outliers['revenue'] = {
|
|
'count': int(revenue_outliers),
|
|
'percentage': (revenue_outliers / len(df)) * 100 if len(df) > 0 else 0,
|
|
'lower_bound': float(lower_bound),
|
|
'upper_bound': float(upper_bound),
|
|
'negative_values': int((revenue < 0).sum())
|
|
}
|
|
|
|
if QUANTITY_COLUMN in df.columns:
|
|
quantity = pd.to_numeric(df[QUANTITY_COLUMN], errors='coerce')
|
|
# Use config thresholds if available
|
|
if MIN_QUANTITY is not None and MAX_QUANTITY is not None:
|
|
quantity_outliers = ((quantity < MIN_QUANTITY) | (quantity > MAX_QUANTITY)).sum()
|
|
outliers['quantity'] = {
|
|
'count': int(quantity_outliers),
|
|
'percentage': (quantity_outliers / len(df)) * 100 if len(df) > 0 else 0,
|
|
'below_min': int((quantity < MIN_QUANTITY).sum()),
|
|
'above_max': int((quantity > MAX_QUANTITY).sum())
|
|
}
|
|
else:
|
|
q1 = quantity.quantile(0.25)
|
|
q3 = quantity.quantile(0.75)
|
|
iqr = q3 - q1
|
|
lower_bound = q1 - 1.5 * iqr
|
|
upper_bound = q3 + 1.5 * iqr
|
|
|
|
quantity_outliers = ((quantity < lower_bound) | (quantity > upper_bound)).sum()
|
|
outliers['quantity'] = {
|
|
'count': int(quantity_outliers),
|
|
'percentage': (quantity_outliers / len(df)) * 100 if len(df) > 0 else 0,
|
|
'lower_bound': float(lower_bound),
|
|
'upper_bound': float(upper_bound)
|
|
}
|
|
|
|
report['outliers'] = outliers
|
|
|
|
# Data types
|
|
report['data_types'] = {
|
|
'numeric_columns': list(df.select_dtypes(include=[np.number]).columns),
|
|
'datetime_columns': list(df.select_dtypes(include=['datetime64']).columns),
|
|
'object_columns': list(df.select_dtypes(include=['object']).columns),
|
|
'type_summary': df.dtypes.value_counts().to_dict()
|
|
}
|
|
|
|
# Date coverage
|
|
if DATE_COLUMN in df.columns:
|
|
date_coverage = df[DATE_COLUMN].notna().sum()
|
|
report['date_coverage'] = {
|
|
'total_rows': len(df),
|
|
'rows_with_dates': int(date_coverage),
|
|
'coverage_percentage': (date_coverage / len(df)) * 100 if len(df) > 0 else 0,
|
|
'min_date': str(df[DATE_COLUMN].min()) if date_coverage > 0 else None,
|
|
'max_date': str(df[DATE_COLUMN].max()) if date_coverage > 0 else None
|
|
}
|
|
|
|
# Revenue summary
|
|
if REVENUE_COLUMN in df.columns:
|
|
revenue = pd.to_numeric(df[REVENUE_COLUMN], errors='coerce')
|
|
valid_revenue = revenue.dropna()
|
|
|
|
if len(valid_revenue) > 0:
|
|
report['revenue_summary'] = {
|
|
'total_revenue': float(valid_revenue.sum()),
|
|
'mean_revenue': float(valid_revenue.mean()),
|
|
'median_revenue': float(valid_revenue.median()),
|
|
'min_revenue': float(valid_revenue.min()),
|
|
'max_revenue': float(valid_revenue.max()),
|
|
'std_revenue': float(valid_revenue.std()),
|
|
'valid_rows': int(len(valid_revenue)),
|
|
'invalid_rows': int(len(df) - len(valid_revenue))
|
|
}
|
|
|
|
# Identify issues
|
|
issues = []
|
|
|
|
# Critical issues
|
|
if report['missing_values']['columns_with_missing'] > 0:
|
|
high_missing = {k: v for k, v in report['missing_values']['percentages'].items() if v > 50}
|
|
if high_missing:
|
|
issues.append({
|
|
'severity': 'critical',
|
|
'issue': f"Columns with >50% missing values: {list(high_missing.keys())}",
|
|
'impact': 'High'
|
|
})
|
|
|
|
if DATE_COLUMN in df.columns:
|
|
if report['date_coverage']['coverage_percentage'] < 50:
|
|
issues.append({
|
|
'severity': 'critical',
|
|
'issue': f"Date coverage is only {report['date_coverage']['coverage_percentage']:.1f}%",
|
|
'impact': 'High - analyses may fail'
|
|
})
|
|
|
|
if REVENUE_COLUMN in df.columns:
|
|
if report['revenue_summary'].get('invalid_rows', 0) > len(df) * 0.1:
|
|
issues.append({
|
|
'severity': 'critical',
|
|
'issue': f"{report['revenue_summary']['invalid_rows']} rows have invalid revenue values",
|
|
'impact': 'High'
|
|
})
|
|
|
|
# Warnings
|
|
if report['duplicates']['duplicate_percentage'] > 5:
|
|
issues.append({
|
|
'severity': 'warning',
|
|
'issue': f"{report['duplicates']['duplicate_rows']} duplicate rows ({report['duplicates']['duplicate_percentage']:.1f}%)",
|
|
'impact': 'Medium'
|
|
})
|
|
|
|
if 'revenue' in outliers:
|
|
if outliers['revenue']['percentage'] > 10:
|
|
issues.append({
|
|
'severity': 'warning',
|
|
'issue': f"{outliers['revenue']['count']} revenue outliers ({outliers['revenue']['percentage']:.1f}%)",
|
|
'impact': 'Medium'
|
|
})
|
|
|
|
report['issues'] = issues
|
|
|
|
return report
|
|
|
|
def print_data_quality_report(report):
|
|
"""
|
|
Print formatted data quality report
|
|
|
|
Args:
|
|
report: Dictionary from generate_data_quality_report()
|
|
"""
|
|
print("\n" + "="*70)
|
|
print("DATA QUALITY REPORT")
|
|
print("="*70)
|
|
|
|
# Overview
|
|
print("\n📊 OVERVIEW")
|
|
print("-" * 70)
|
|
print(f"Total Rows: {report['overview']['total_rows']:,}")
|
|
print(f"Total Columns: {report['overview']['total_columns']}")
|
|
print(f"Memory Usage: {report['overview']['memory_usage_mb']:.2f} MB")
|
|
|
|
# Missing values
|
|
print("\n🔍 MISSING VALUES")
|
|
print("-" * 70)
|
|
if report['missing_values']['columns_with_missing'] > 0:
|
|
print(f"Columns with missing values: {report['missing_values']['columns_with_missing']}")
|
|
print(f"Total missing values: {report['missing_values']['total_missing']:,}")
|
|
print("\nTop columns by missing values:")
|
|
missing_sorted = sorted(
|
|
report['missing_values']['percentages'].items(),
|
|
key=lambda x: x[1],
|
|
reverse=True
|
|
)[:10]
|
|
for col, pct in missing_sorted:
|
|
count = report['missing_values']['by_column'][col]
|
|
print(f" {col:30s}: {count:8,} ({pct:5.1f}%)")
|
|
else:
|
|
print("✅ No missing values found")
|
|
|
|
# Duplicates
|
|
print("\n🔄 DUPLICATES")
|
|
print("-" * 70)
|
|
if report['duplicates']['duplicate_rows'] > 0:
|
|
print(f"Duplicate Rows: {report['duplicates']['duplicate_rows']:,} ({report['duplicates']['duplicate_percentage']:.2f}%)")
|
|
else:
|
|
print("✅ No duplicate rows found")
|
|
|
|
# Outliers
|
|
print("\n📈 OUTLIERS")
|
|
print("-" * 70)
|
|
if 'revenue' in report['outliers']:
|
|
rev_out = report['outliers']['revenue']
|
|
print(f"Revenue Outliers: {rev_out['count']:,} ({rev_out['percentage']:.2f}%)")
|
|
if 'negative_values' in rev_out and rev_out['negative_values'] > 0:
|
|
print(f" Negative Revenue Values: {rev_out['negative_values']:,}")
|
|
|
|
if 'quantity' in report['outliers']:
|
|
qty_out = report['outliers']['quantity']
|
|
print(f"Quantity Outliers: {qty_out['count']:,} ({qty_out['percentage']:.2f}%)")
|
|
|
|
if not report['outliers']:
|
|
print("✅ No significant outliers detected")
|
|
|
|
# Date coverage
|
|
if report['date_coverage']:
|
|
print("\n📅 DATE COVERAGE")
|
|
print("-" * 70)
|
|
dc = report['date_coverage']
|
|
print(f"Rows with Dates: {dc['rows_with_dates']:,} / {dc['total_rows']:,} ({dc['coverage_percentage']:.1f}%)")
|
|
if dc['min_date']:
|
|
print(f"Date Range: {dc['min_date']} to {dc['max_date']}")
|
|
|
|
# Revenue summary
|
|
if report['revenue_summary']:
|
|
print("\n💰 REVENUE SUMMARY")
|
|
print("-" * 70)
|
|
rs = report['revenue_summary']
|
|
print(f"Total Revenue: ${rs['total_revenue'] / 1e6:.2f}m")
|
|
print(f"Valid Rows: {rs['valid_rows']:,} / {rs['valid_rows'] + rs['invalid_rows']:,}")
|
|
if rs['invalid_rows'] > 0:
|
|
print(f"Invalid Rows: {rs['invalid_rows']:,}")
|
|
print(f"Mean: ${rs['mean_revenue']:,.2f}")
|
|
print(f"Median: ${rs['median_revenue']:,.2f}")
|
|
print(f"Min: ${rs['min_revenue']:,.2f}")
|
|
print(f"Max: ${rs['max_revenue']:,.2f}")
|
|
|
|
# Issues
|
|
if report['issues']:
|
|
print("\n⚠️ ISSUES DETECTED")
|
|
print("-" * 70)
|
|
critical = [i for i in report['issues'] if i['severity'] == 'critical']
|
|
warnings = [i for i in report['issues'] if i['severity'] == 'warning']
|
|
|
|
if critical:
|
|
print("❌ CRITICAL ISSUES:")
|
|
for issue in critical:
|
|
print(f" • {issue['issue']}")
|
|
print(f" Impact: {issue['impact']}")
|
|
|
|
if warnings:
|
|
print("\n⚠️ WARNINGS:")
|
|
for issue in warnings:
|
|
print(f" • {issue['issue']}")
|
|
print(f" Impact: {issue['impact']}")
|
|
else:
|
|
print("\n✅ NO ISSUES DETECTED")
|
|
|
|
print("\n" + "="*70)
|
|
|
|
def generate_data_quality_report_simple(df):
|
|
"""
|
|
Generate a simple data quality summary (quick check)
|
|
|
|
Args:
|
|
df: DataFrame to analyze
|
|
|
|
Returns:
|
|
str: Simple summary string
|
|
"""
|
|
summary_parts = []
|
|
|
|
summary_parts.append(f"Rows: {len(df):,}")
|
|
summary_parts.append(f"Columns: {len(df.columns)}")
|
|
|
|
if REVENUE_COLUMN in df.columns:
|
|
revenue = pd.to_numeric(df[REVENUE_COLUMN], errors='coerce')
|
|
valid = revenue.notna().sum()
|
|
summary_parts.append(f"Valid Revenue: {valid:,} ({valid/len(df)*100:.1f}%)")
|
|
|
|
if DATE_COLUMN in df.columns:
|
|
date_coverage = df[DATE_COLUMN].notna().sum()
|
|
summary_parts.append(f"Date Coverage: {date_coverage:,} ({date_coverage/len(df)*100:.1f}%)")
|
|
|
|
return " | ".join(summary_parts)
|
|
|
|
# ============================================================================
|
|
# STANDALONE DATA QUALITY CHECK
|
|
# ============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
"""Run data quality check"""
|
|
from data_loader import load_sales_data
|
|
from config import get_data_path
|
|
|
|
print("Loading data for quality check...")
|
|
try:
|
|
df = load_sales_data(get_data_path())
|
|
report = generate_data_quality_report(df)
|
|
print_data_quality_report(report)
|
|
except Exception as e:
|
|
print(f"ERROR: {e}")
|