Files
sales-data-analysis/data_processing.py
Jonathan Pressnell cf0b596449 Initial commit: sales analysis template
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-06 09:16:34 -05:00

286 lines
8.9 KiB
Python

"""
Data processing utilities
Common data cleaning and transformation helpers
Usage:
from data_processing import clean_data, create_pivot_table, prepare_time_series
# Clean data
df_clean = clean_data(df)
# Create pivot table
pivot = create_pivot_table(df, index='Year', columns='Product', values='Revenue')
"""
import pandas as pd
import numpy as np
from config import REVENUE_COLUMN, DATE_COLUMN, MIN_QUANTITY, MAX_QUANTITY
def clean_data(df, remove_duplicates=True, handle_missing_dates=True):
"""
Clean data with common operations
Args:
df: DataFrame to clean
remove_duplicates: Whether to remove duplicate rows
handle_missing_dates: Whether to handle missing dates
Returns:
DataFrame: Cleaned DataFrame
"""
df_clean = df.copy()
# Remove duplicates
if remove_duplicates:
initial_count = len(df_clean)
df_clean = df_clean.drop_duplicates()
removed = initial_count - len(df_clean)
if removed > 0:
print(f"Removed {removed:,} duplicate rows")
# Handle missing dates
if handle_missing_dates and DATE_COLUMN in df_clean.columns:
missing_dates = df_clean[DATE_COLUMN].isna().sum()
if missing_dates > 0:
print(f"Warning: {missing_dates:,} rows have missing dates")
# Remove rows with negative revenue (if configured)
if REVENUE_COLUMN in df_clean.columns:
negative_revenue = (df_clean[REVENUE_COLUMN] < 0).sum()
if negative_revenue > 0:
print(f"Found {negative_revenue:,} rows with negative revenue")
# Optionally remove: df_clean = df_clean[df_clean[REVENUE_COLUMN] >= 0]
return df_clean
def create_pivot_table(df, index, columns=None, values=None, aggfunc='sum', fill_value=0):
"""
Create pivot table with common defaults
Args:
df: DataFrame
index: Column(s) to use as index
columns: Column(s) to use as columns
values: Column(s) to aggregate
aggfunc: Aggregation function (default: 'sum')
fill_value: Value to fill missing cells (default: 0)
Returns:
DataFrame: Pivot table
"""
if values is None and REVENUE_COLUMN in df.columns:
values = REVENUE_COLUMN
pivot = pd.pivot_table(
df,
index=index,
columns=columns,
values=values,
aggfunc=aggfunc,
fill_value=fill_value
)
return pivot
def prepare_time_series(df, date_column=None, value_column=None, freq='M'):
"""
Prepare time series data
Args:
df: DataFrame
date_column: Date column name (defaults to config.DATE_COLUMN)
value_column: Value column to aggregate (defaults to config.REVENUE_COLUMN)
freq: Frequency for resampling ('D', 'W', 'M', 'Q', 'Y')
Returns:
Series: Time series data
"""
if date_column is None:
date_column = DATE_COLUMN
if value_column is None:
value_column = REVENUE_COLUMN
if date_column not in df.columns:
raise ValueError(f"Date column '{date_column}' not found")
if value_column not in df.columns:
raise ValueError(f"Value column '{value_column}' not found")
# Ensure date column is datetime
df = df.copy()
df[date_column] = pd.to_datetime(df[date_column], errors='coerce')
# Set date as index
df_indexed = df.set_index(date_column)
# Resample and aggregate
time_series = df_indexed[value_column].resample(freq).sum()
return time_series
def aggregate_by_period(df, period='year', date_column=None, value_column=None):
"""
Aggregate data by time period
Args:
df: DataFrame
period: Period type ('year', 'month', 'quarter')
date_column: Date column name
value_column: Value column to aggregate
Returns:
DataFrame: Aggregated data
"""
if date_column is None:
date_column = DATE_COLUMN
if value_column is None:
value_column = REVENUE_COLUMN
df = df.copy()
df[date_column] = pd.to_datetime(df[date_column], errors='coerce')
# Extract period
if period == 'year':
df['Period'] = df[date_column].dt.year
elif period == 'month':
df['Period'] = df[date_column].dt.to_period('M')
elif period == 'quarter':
df['Period'] = df[date_column].dt.to_period('Q')
else:
raise ValueError(f"Unknown period: {period}")
# Aggregate
aggregated = df.groupby('Period')[value_column].agg(['sum', 'count', 'mean']).reset_index()
aggregated.columns = ['Period', 'Total', 'Count', 'Average']
return aggregated
def filter_outliers(df, column, method='iqr', lower_bound=None, upper_bound=None):
"""
Filter outliers from DataFrame
Args:
df: DataFrame
column: Column name to filter on
method: Method ('iqr' for interquartile range, 'zscore' for z-score)
lower_bound: Manual lower bound
upper_bound: Manual upper bound
Returns:
DataFrame: Filtered DataFrame
"""
df_filtered = df.copy()
if method == 'iqr':
q1 = df[column].quantile(0.25)
q3 = df[column].quantile(0.75)
iqr = q3 - q1
lower = lower_bound if lower_bound is not None else q1 - 1.5 * iqr
upper = upper_bound if upper_bound is not None else q3 + 1.5 * iqr
elif method == 'zscore':
mean = df[column].mean()
std = df[column].std()
lower = lower_bound if lower_bound is not None else mean - 3 * std
upper = upper_bound if upper_bound is not None else mean + 3 * std
else:
raise ValueError(f"Unknown method: {method}")
initial_count = len(df_filtered)
df_filtered = df_filtered[(df_filtered[column] >= lower) & (df_filtered[column] <= upper)]
removed = initial_count - len(df_filtered)
if removed > 0:
print(f"Removed {removed:,} outliers from {column} ({removed/initial_count*100:.1f}%)")
return df_filtered
def normalize_column(df, column, method='min_max'):
"""
Normalize a column
Args:
df: DataFrame
column: Column name to normalize
method: Normalization method ('min_max', 'zscore')
Returns:
Series: Normalized values
"""
if method == 'min_max':
min_val = df[column].min()
max_val = df[column].max()
if max_val - min_val == 0:
return pd.Series([0] * len(df), index=df.index)
return (df[column] - min_val) / (max_val - min_val)
elif method == 'zscore':
mean = df[column].mean()
std = df[column].std()
if std == 0:
return pd.Series([0] * len(df), index=df.index)
return (df[column] - mean) / std
else:
raise ValueError(f"Unknown method: {method}")
def create_derived_columns(df):
"""
Create common derived columns
Args:
df: DataFrame
Returns:
DataFrame: DataFrame with derived columns
"""
df_derived = df.copy()
# Extract year, month, quarter if date column exists
if DATE_COLUMN in df_derived.columns:
df_derived[DATE_COLUMN] = pd.to_datetime(df_derived[DATE_COLUMN], errors='coerce')
if 'Year' not in df_derived.columns:
df_derived['Year'] = df_derived[DATE_COLUMN].dt.year
if 'Month' not in df_derived.columns:
df_derived['Month'] = df_derived[DATE_COLUMN].dt.month
if 'Quarter' not in df_derived.columns:
df_derived['Quarter'] = df_derived[DATE_COLUMN].dt.quarter
if 'YearMonth' not in df_derived.columns:
df_derived['YearMonth'] = df_derived[DATE_COLUMN].dt.to_period('M')
# Calculate price per unit if quantity and revenue exist
from config import QUANTITY_COLUMN
if QUANTITY_COLUMN in df_derived.columns and REVENUE_COLUMN in df_derived.columns:
df_derived['Price_Per_Unit'] = df_derived[REVENUE_COLUMN] / df_derived[QUANTITY_COLUMN].replace(0, np.nan)
return df_derived
# ============================================================================
# EXAMPLE USAGE
# ============================================================================
if __name__ == "__main__":
"""Example usage"""
# Create sample data
df = pd.DataFrame({
'InvoiceDate': pd.date_range('2023-01-01', periods=100, freq='D'),
'USD': np.random.normal(1000, 200, 100),
'Quantity': np.random.randint(1, 100, 100)
})
# Clean data
df_clean = clean_data(df)
print(f"Cleaned data: {len(df_clean)} rows")
# Create pivot table
df_clean['Year'] = df_clean['InvoiceDate'].dt.year
pivot = create_pivot_table(df_clean, index='Year', values='USD')
print("\nPivot table:")
print(pivot)
# Prepare time series
ts = prepare_time_series(df_clean, freq='M')
print(f"\nTime series: {len(ts)} periods")