286 lines
8.9 KiB
Python
286 lines
8.9 KiB
Python
"""
|
|
Data processing utilities
|
|
Common data cleaning and transformation helpers
|
|
|
|
Usage:
|
|
from data_processing import clean_data, create_pivot_table, prepare_time_series
|
|
|
|
# Clean data
|
|
df_clean = clean_data(df)
|
|
|
|
# Create pivot table
|
|
pivot = create_pivot_table(df, index='Year', columns='Product', values='Revenue')
|
|
"""
|
|
import pandas as pd
|
|
import numpy as np
|
|
from config import REVENUE_COLUMN, DATE_COLUMN, MIN_QUANTITY, MAX_QUANTITY
|
|
|
|
def clean_data(df, remove_duplicates=True, handle_missing_dates=True):
|
|
"""
|
|
Clean data with common operations
|
|
|
|
Args:
|
|
df: DataFrame to clean
|
|
remove_duplicates: Whether to remove duplicate rows
|
|
handle_missing_dates: Whether to handle missing dates
|
|
|
|
Returns:
|
|
DataFrame: Cleaned DataFrame
|
|
"""
|
|
df_clean = df.copy()
|
|
|
|
# Remove duplicates
|
|
if remove_duplicates:
|
|
initial_count = len(df_clean)
|
|
df_clean = df_clean.drop_duplicates()
|
|
removed = initial_count - len(df_clean)
|
|
if removed > 0:
|
|
print(f"Removed {removed:,} duplicate rows")
|
|
|
|
# Handle missing dates
|
|
if handle_missing_dates and DATE_COLUMN in df_clean.columns:
|
|
missing_dates = df_clean[DATE_COLUMN].isna().sum()
|
|
if missing_dates > 0:
|
|
print(f"Warning: {missing_dates:,} rows have missing dates")
|
|
|
|
# Remove rows with negative revenue (if configured)
|
|
if REVENUE_COLUMN in df_clean.columns:
|
|
negative_revenue = (df_clean[REVENUE_COLUMN] < 0).sum()
|
|
if negative_revenue > 0:
|
|
print(f"Found {negative_revenue:,} rows with negative revenue")
|
|
# Optionally remove: df_clean = df_clean[df_clean[REVENUE_COLUMN] >= 0]
|
|
|
|
return df_clean
|
|
|
|
def create_pivot_table(df, index, columns=None, values=None, aggfunc='sum', fill_value=0):
|
|
"""
|
|
Create pivot table with common defaults
|
|
|
|
Args:
|
|
df: DataFrame
|
|
index: Column(s) to use as index
|
|
columns: Column(s) to use as columns
|
|
values: Column(s) to aggregate
|
|
aggfunc: Aggregation function (default: 'sum')
|
|
fill_value: Value to fill missing cells (default: 0)
|
|
|
|
Returns:
|
|
DataFrame: Pivot table
|
|
"""
|
|
if values is None and REVENUE_COLUMN in df.columns:
|
|
values = REVENUE_COLUMN
|
|
|
|
pivot = pd.pivot_table(
|
|
df,
|
|
index=index,
|
|
columns=columns,
|
|
values=values,
|
|
aggfunc=aggfunc,
|
|
fill_value=fill_value
|
|
)
|
|
|
|
return pivot
|
|
|
|
def prepare_time_series(df, date_column=None, value_column=None, freq='M'):
|
|
"""
|
|
Prepare time series data
|
|
|
|
Args:
|
|
df: DataFrame
|
|
date_column: Date column name (defaults to config.DATE_COLUMN)
|
|
value_column: Value column to aggregate (defaults to config.REVENUE_COLUMN)
|
|
freq: Frequency for resampling ('D', 'W', 'M', 'Q', 'Y')
|
|
|
|
Returns:
|
|
Series: Time series data
|
|
"""
|
|
if date_column is None:
|
|
date_column = DATE_COLUMN
|
|
|
|
if value_column is None:
|
|
value_column = REVENUE_COLUMN
|
|
|
|
if date_column not in df.columns:
|
|
raise ValueError(f"Date column '{date_column}' not found")
|
|
|
|
if value_column not in df.columns:
|
|
raise ValueError(f"Value column '{value_column}' not found")
|
|
|
|
# Ensure date column is datetime
|
|
df = df.copy()
|
|
df[date_column] = pd.to_datetime(df[date_column], errors='coerce')
|
|
|
|
# Set date as index
|
|
df_indexed = df.set_index(date_column)
|
|
|
|
# Resample and aggregate
|
|
time_series = df_indexed[value_column].resample(freq).sum()
|
|
|
|
return time_series
|
|
|
|
def aggregate_by_period(df, period='year', date_column=None, value_column=None):
|
|
"""
|
|
Aggregate data by time period
|
|
|
|
Args:
|
|
df: DataFrame
|
|
period: Period type ('year', 'month', 'quarter')
|
|
date_column: Date column name
|
|
value_column: Value column to aggregate
|
|
|
|
Returns:
|
|
DataFrame: Aggregated data
|
|
"""
|
|
if date_column is None:
|
|
date_column = DATE_COLUMN
|
|
|
|
if value_column is None:
|
|
value_column = REVENUE_COLUMN
|
|
|
|
df = df.copy()
|
|
df[date_column] = pd.to_datetime(df[date_column], errors='coerce')
|
|
|
|
# Extract period
|
|
if period == 'year':
|
|
df['Period'] = df[date_column].dt.year
|
|
elif period == 'month':
|
|
df['Period'] = df[date_column].dt.to_period('M')
|
|
elif period == 'quarter':
|
|
df['Period'] = df[date_column].dt.to_period('Q')
|
|
else:
|
|
raise ValueError(f"Unknown period: {period}")
|
|
|
|
# Aggregate
|
|
aggregated = df.groupby('Period')[value_column].agg(['sum', 'count', 'mean']).reset_index()
|
|
aggregated.columns = ['Period', 'Total', 'Count', 'Average']
|
|
|
|
return aggregated
|
|
|
|
def filter_outliers(df, column, method='iqr', lower_bound=None, upper_bound=None):
|
|
"""
|
|
Filter outliers from DataFrame
|
|
|
|
Args:
|
|
df: DataFrame
|
|
column: Column name to filter on
|
|
method: Method ('iqr' for interquartile range, 'zscore' for z-score)
|
|
lower_bound: Manual lower bound
|
|
upper_bound: Manual upper bound
|
|
|
|
Returns:
|
|
DataFrame: Filtered DataFrame
|
|
"""
|
|
df_filtered = df.copy()
|
|
|
|
if method == 'iqr':
|
|
q1 = df[column].quantile(0.25)
|
|
q3 = df[column].quantile(0.75)
|
|
iqr = q3 - q1
|
|
lower = lower_bound if lower_bound is not None else q1 - 1.5 * iqr
|
|
upper = upper_bound if upper_bound is not None else q3 + 1.5 * iqr
|
|
elif method == 'zscore':
|
|
mean = df[column].mean()
|
|
std = df[column].std()
|
|
lower = lower_bound if lower_bound is not None else mean - 3 * std
|
|
upper = upper_bound if upper_bound is not None else mean + 3 * std
|
|
else:
|
|
raise ValueError(f"Unknown method: {method}")
|
|
|
|
initial_count = len(df_filtered)
|
|
df_filtered = df_filtered[(df_filtered[column] >= lower) & (df_filtered[column] <= upper)]
|
|
removed = initial_count - len(df_filtered)
|
|
|
|
if removed > 0:
|
|
print(f"Removed {removed:,} outliers from {column} ({removed/initial_count*100:.1f}%)")
|
|
|
|
return df_filtered
|
|
|
|
def normalize_column(df, column, method='min_max'):
|
|
"""
|
|
Normalize a column
|
|
|
|
Args:
|
|
df: DataFrame
|
|
column: Column name to normalize
|
|
method: Normalization method ('min_max', 'zscore')
|
|
|
|
Returns:
|
|
Series: Normalized values
|
|
"""
|
|
if method == 'min_max':
|
|
min_val = df[column].min()
|
|
max_val = df[column].max()
|
|
if max_val - min_val == 0:
|
|
return pd.Series([0] * len(df), index=df.index)
|
|
return (df[column] - min_val) / (max_val - min_val)
|
|
elif method == 'zscore':
|
|
mean = df[column].mean()
|
|
std = df[column].std()
|
|
if std == 0:
|
|
return pd.Series([0] * len(df), index=df.index)
|
|
return (df[column] - mean) / std
|
|
else:
|
|
raise ValueError(f"Unknown method: {method}")
|
|
|
|
def create_derived_columns(df):
|
|
"""
|
|
Create common derived columns
|
|
|
|
Args:
|
|
df: DataFrame
|
|
|
|
Returns:
|
|
DataFrame: DataFrame with derived columns
|
|
"""
|
|
df_derived = df.copy()
|
|
|
|
# Extract year, month, quarter if date column exists
|
|
if DATE_COLUMN in df_derived.columns:
|
|
df_derived[DATE_COLUMN] = pd.to_datetime(df_derived[DATE_COLUMN], errors='coerce')
|
|
|
|
if 'Year' not in df_derived.columns:
|
|
df_derived['Year'] = df_derived[DATE_COLUMN].dt.year
|
|
|
|
if 'Month' not in df_derived.columns:
|
|
df_derived['Month'] = df_derived[DATE_COLUMN].dt.month
|
|
|
|
if 'Quarter' not in df_derived.columns:
|
|
df_derived['Quarter'] = df_derived[DATE_COLUMN].dt.quarter
|
|
|
|
if 'YearMonth' not in df_derived.columns:
|
|
df_derived['YearMonth'] = df_derived[DATE_COLUMN].dt.to_period('M')
|
|
|
|
# Calculate price per unit if quantity and revenue exist
|
|
from config import QUANTITY_COLUMN
|
|
if QUANTITY_COLUMN in df_derived.columns and REVENUE_COLUMN in df_derived.columns:
|
|
df_derived['Price_Per_Unit'] = df_derived[REVENUE_COLUMN] / df_derived[QUANTITY_COLUMN].replace(0, np.nan)
|
|
|
|
return df_derived
|
|
|
|
# ============================================================================
|
|
# EXAMPLE USAGE
|
|
# ============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
"""Example usage"""
|
|
# Create sample data
|
|
df = pd.DataFrame({
|
|
'InvoiceDate': pd.date_range('2023-01-01', periods=100, freq='D'),
|
|
'USD': np.random.normal(1000, 200, 100),
|
|
'Quantity': np.random.randint(1, 100, 100)
|
|
})
|
|
|
|
# Clean data
|
|
df_clean = clean_data(df)
|
|
print(f"Cleaned data: {len(df_clean)} rows")
|
|
|
|
# Create pivot table
|
|
df_clean['Year'] = df_clean['InvoiceDate'].dt.year
|
|
pivot = create_pivot_table(df_clean, index='Year', values='USD')
|
|
print("\nPivot table:")
|
|
print(pivot)
|
|
|
|
# Prepare time series
|
|
ts = prepare_time_series(df_clean, freq='M')
|
|
print(f"\nTime series: {len(ts)} periods")
|