322 lines
8.2 KiB
Python
322 lines
8.2 KiB
Python
"""
|
|
Statistical analysis utilities
|
|
Common statistical operations for sales analysis
|
|
|
|
Usage:
|
|
from statistical_utils import calculate_yoy_growth, calculate_cagr, calculate_correlation
|
|
|
|
# Calculate year-over-year growth
|
|
growth = calculate_yoy_growth(current_value=100, previous_value=90)
|
|
|
|
# Calculate CAGR
|
|
cagr = calculate_cagr(start_value=100, end_value=150, periods=3)
|
|
"""
|
|
import pandas as pd
|
|
import numpy as np
|
|
from scipy import stats
|
|
|
|
def calculate_yoy_growth(current, previous):
|
|
"""
|
|
Calculate year-over-year growth percentage
|
|
|
|
Args:
|
|
current: Current period value
|
|
previous: Previous period value
|
|
|
|
Returns:
|
|
float: Growth percentage (can be negative)
|
|
|
|
Example:
|
|
calculate_yoy_growth(110, 100) # Returns 10.0
|
|
calculate_yoy_growth(90, 100) # Returns -10.0
|
|
"""
|
|
if previous == 0:
|
|
return np.nan if current == 0 else np.inf
|
|
|
|
return ((current - previous) / previous) * 100
|
|
|
|
def calculate_cagr(start_value, end_value, periods):
|
|
"""
|
|
Calculate Compound Annual Growth Rate (CAGR)
|
|
|
|
Args:
|
|
start_value: Starting value
|
|
end_value: Ending value
|
|
periods: Number of periods (years)
|
|
|
|
Returns:
|
|
float: CAGR as percentage
|
|
|
|
Example:
|
|
calculate_cagr(100, 150, 3) # Returns ~14.47%
|
|
"""
|
|
if start_value <= 0 or periods <= 0:
|
|
return np.nan
|
|
|
|
if end_value <= 0:
|
|
return np.nan
|
|
|
|
cagr = ((end_value / start_value) ** (1 / periods) - 1) * 100
|
|
return cagr
|
|
|
|
def calculate_correlation(df, col1, col2):
|
|
"""
|
|
Calculate correlation between two columns
|
|
|
|
Args:
|
|
df: DataFrame
|
|
col1: First column name
|
|
col2: Second column name
|
|
|
|
Returns:
|
|
float: Correlation coefficient (-1 to 1)
|
|
"""
|
|
if col1 not in df.columns or col2 not in df.columns:
|
|
return np.nan
|
|
|
|
# Convert to numeric
|
|
series1 = pd.to_numeric(df[col1], errors='coerce')
|
|
series2 = pd.to_numeric(df[col2], errors='coerce')
|
|
|
|
# Remove NaN pairs
|
|
valid_mask = series1.notna() & series2.notna()
|
|
if valid_mask.sum() < 2:
|
|
return np.nan
|
|
|
|
correlation = series1[valid_mask].corr(series2[valid_mask])
|
|
return correlation
|
|
|
|
def calculate_trend_slope(y_values):
|
|
"""
|
|
Calculate linear trend slope
|
|
|
|
Args:
|
|
y_values: Array-like of y values
|
|
|
|
Returns:
|
|
float: Slope of linear trend
|
|
"""
|
|
if len(y_values) < 2:
|
|
return np.nan
|
|
|
|
x_values = np.arange(len(y_values))
|
|
|
|
# Remove NaN values
|
|
valid_mask = ~np.isnan(y_values)
|
|
if valid_mask.sum() < 2:
|
|
return np.nan
|
|
|
|
x_valid = x_values[valid_mask]
|
|
y_valid = y_values[valid_mask]
|
|
|
|
slope, intercept, r_value, p_value, std_err = stats.linregress(x_valid, y_valid)
|
|
return slope
|
|
|
|
def calculate_percent_change(series, periods=1):
|
|
"""
|
|
Calculate percent change over periods
|
|
|
|
Args:
|
|
series: Pandas Series
|
|
periods: Number of periods to shift (default: 1)
|
|
|
|
Returns:
|
|
Series: Percent change
|
|
"""
|
|
return series.pct_change(periods=periods) * 100
|
|
|
|
def calculate_moving_average(series, window=3):
|
|
"""
|
|
Calculate moving average
|
|
|
|
Args:
|
|
series: Pandas Series
|
|
window: Window size for moving average
|
|
|
|
Returns:
|
|
Series: Moving average
|
|
"""
|
|
return series.rolling(window=window, center=False).mean()
|
|
|
|
def calculate_volatility(series, window=12):
|
|
"""
|
|
Calculate rolling volatility (standard deviation)
|
|
|
|
Args:
|
|
series: Pandas Series
|
|
window: Window size for rolling calculation
|
|
|
|
Returns:
|
|
Series: Rolling volatility
|
|
"""
|
|
return series.rolling(window=window, center=False).std()
|
|
|
|
def calculate_z_score(value, mean, std):
|
|
"""
|
|
Calculate z-score
|
|
|
|
Args:
|
|
value: Value to score
|
|
mean: Mean of distribution
|
|
std: Standard deviation of distribution
|
|
|
|
Returns:
|
|
float: Z-score
|
|
"""
|
|
if std == 0:
|
|
return np.nan
|
|
|
|
return (value - mean) / std
|
|
|
|
def test_statistical_significance(group1, group2, alpha=0.05):
|
|
"""
|
|
Test statistical significance between two groups (t-test)
|
|
|
|
Args:
|
|
group1: First group (array-like)
|
|
group2: Second group (array-like)
|
|
alpha: Significance level (default: 0.05)
|
|
|
|
Returns:
|
|
dict: Test results with p-value, significant flag, etc.
|
|
"""
|
|
group1 = np.array(group1)
|
|
group2 = np.array(group2)
|
|
|
|
# Remove NaN values
|
|
group1 = group1[~np.isnan(group1)]
|
|
group2 = group2[~np.isnan(group2)]
|
|
|
|
if len(group1) < 2 or len(group2) < 2:
|
|
return {
|
|
'p_value': np.nan,
|
|
'significant': False,
|
|
'test_statistic': np.nan,
|
|
'error': 'Insufficient data'
|
|
}
|
|
|
|
# Perform t-test
|
|
t_statistic, p_value = stats.ttest_ind(group1, group2)
|
|
|
|
return {
|
|
'p_value': float(p_value),
|
|
'significant': p_value < alpha,
|
|
'test_statistic': float(t_statistic),
|
|
'alpha': alpha,
|
|
'group1_mean': float(np.mean(group1)),
|
|
'group2_mean': float(np.mean(group2)),
|
|
'group1_std': float(np.std(group1)),
|
|
'group2_std': float(np.std(group2))
|
|
}
|
|
|
|
def calculate_confidence_interval(series, confidence=0.95):
|
|
"""
|
|
Calculate confidence interval for a series
|
|
|
|
Args:
|
|
series: Pandas Series
|
|
confidence: Confidence level (default: 0.95 for 95%)
|
|
|
|
Returns:
|
|
dict: Mean, lower bound, upper bound
|
|
"""
|
|
series_clean = series.dropna()
|
|
|
|
if len(series_clean) == 0:
|
|
return {
|
|
'mean': np.nan,
|
|
'lower': np.nan,
|
|
'upper': np.nan,
|
|
'confidence': confidence
|
|
}
|
|
|
|
mean = series_clean.mean()
|
|
std = series_clean.std()
|
|
n = len(series_clean)
|
|
|
|
# Calculate standard error
|
|
se = std / np.sqrt(n)
|
|
|
|
# Calculate critical value (z-score for normal distribution)
|
|
alpha = 1 - confidence
|
|
z_critical = stats.norm.ppf(1 - alpha/2)
|
|
|
|
margin = z_critical * se
|
|
|
|
return {
|
|
'mean': float(mean),
|
|
'lower': float(mean - margin),
|
|
'upper': float(mean + margin),
|
|
'confidence': confidence,
|
|
'margin': float(margin)
|
|
}
|
|
|
|
def calculate_annual_growth_rates(values, years):
|
|
"""
|
|
Calculate year-over-year growth rates for annual data
|
|
|
|
Args:
|
|
values: Array-like of annual values
|
|
years: Array-like of corresponding years
|
|
|
|
Returns:
|
|
DataFrame: Years, values, and growth rates
|
|
"""
|
|
df = pd.DataFrame({
|
|
'Year': years,
|
|
'Value': values
|
|
})
|
|
|
|
df['YoY_Growth'] = calculate_percent_change(df['Value'])
|
|
df['YoY_Change'] = df['Value'].diff()
|
|
|
|
return df
|
|
|
|
def calculate_seasonality_index(monthly_series):
|
|
"""
|
|
Calculate seasonality index for monthly data
|
|
|
|
Args:
|
|
monthly_series: Series with datetime index (monthly frequency)
|
|
|
|
Returns:
|
|
Series: Seasonality index (1.0 = average, >1.0 = above average, <1.0 = below average)
|
|
"""
|
|
if not isinstance(monthly_series.index, pd.DatetimeIndex):
|
|
raise ValueError("Series must have DatetimeIndex")
|
|
|
|
# Extract month
|
|
monthly_series = monthly_series.copy()
|
|
monthly_series['Month'] = monthly_series.index.month
|
|
|
|
# Calculate average by month
|
|
monthly_avg = monthly_series.groupby('Month').mean()
|
|
overall_avg = monthly_series.mean()
|
|
|
|
# Calculate seasonality index
|
|
seasonality = monthly_avg / overall_avg
|
|
|
|
return seasonality
|
|
|
|
# ============================================================================
|
|
# EXAMPLE USAGE
|
|
# ============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
"""Example usage"""
|
|
# YoY Growth
|
|
growth = calculate_yoy_growth(110, 100)
|
|
print(f"Year-over-year growth: {growth:.2f}%")
|
|
|
|
# CAGR
|
|
cagr = calculate_cagr(100, 150, 3)
|
|
print(f"CAGR: {cagr:.2f}%")
|
|
|
|
# Sample data for correlation
|
|
df = pd.DataFrame({
|
|
'Revenue': [100, 110, 120, 130, 140],
|
|
'Quantity': [10, 11, 12, 13, 14]
|
|
})
|
|
corr = calculate_correlation(df, 'Revenue', 'Quantity')
|
|
print(f"Correlation: {corr:.2f}")
|