12 KiB
12 KiB
Business Logic Security Reference
Overview
Business logic vulnerabilities occur when the application's logic can be manipulated to achieve unintended outcomes. Unlike technical vulnerabilities, these flaws exploit legitimate functionality in unexpected ways.
Common Vulnerability Types
1. Race Conditions
Time-of-Check to Time-of-Use (TOCTOU)
# VULNERABLE: Race condition in balance check
def transfer(from_account, to_account, amount):
if from_account.balance >= amount: # Check
time.sleep(0.1) # Simulating processing delay
from_account.balance -= amount # Use
to_account.balance += amount
# Attack: Two concurrent transfers can overdraft
# SAFE: Atomic operation with locking
from threading import Lock
account_locks = {}
def transfer(from_account, to_account, amount):
# Acquire locks in consistent order to prevent deadlock
locks = sorted([from_account.id, to_account.id])
with account_locks[locks[0]], account_locks[locks[1]]:
if from_account.balance >= amount:
from_account.balance -= amount
to_account.balance += amount
return True
return False
Database-Level Locking
# SAFE: Database transaction with SELECT FOR UPDATE
from django.db import transaction
@transaction.atomic
def transfer(from_account_id, to_account_id, amount):
from_account = Account.objects.select_for_update().get(id=from_account_id)
to_account = Account.objects.select_for_update().get(id=to_account_id)
if from_account.balance >= amount:
from_account.balance -= amount
to_account.balance += amount
from_account.save()
to_account.save()
return True
return False
2. Workflow Bypass
# VULNERABLE: Multi-step process without server-side tracking
# Step 1: /verify-email
# Step 2: /set-password
# Step 3: /complete-registration
# Attacker skips to Step 3
# SAFE: Server-side state machine
class RegistrationFlow:
STATES = ['email_pending', 'email_verified', 'password_set', 'complete']
def __init__(self, user_id):
self.state = self.get_state(user_id)
def verify_email(self, token):
if self.state != 'email_pending':
raise InvalidStateError("Email verification not pending")
# Verify token...
self.set_state('email_verified')
def set_password(self, password):
if self.state != 'email_verified':
raise InvalidStateError("Email not verified")
# Set password...
self.set_state('password_set')
def complete(self):
if self.state != 'password_set':
raise InvalidStateError("Password not set")
# Complete registration...
self.set_state('complete')
3. Numeric Manipulation
Integer Overflow
# VULNERABLE: Integer overflow in quantity
def calculate_total(quantity, price):
return quantity * price
# Attack: quantity = -1 results in negative price (refund)
# SAFE: Validate numeric ranges
def calculate_total(quantity, price):
if quantity <= 0 or quantity > MAX_QUANTITY:
raise ValueError("Invalid quantity")
if price <= 0:
raise ValueError("Invalid price")
return quantity * price
Floating Point Issues
# VULNERABLE: Floating point precision loss
total = 0.0
for item in items:
total += item.price * item.quantity
# 0.1 + 0.2 = 0.30000000000000004
# SAFE: Use Decimal for financial calculations
from decimal import Decimal, ROUND_HALF_UP
total = Decimal('0')
for item in items:
total += Decimal(str(item.price)) * item.quantity
# Round properly
total = total.quantize(Decimal('.01'), rounding=ROUND_HALF_UP)
4. Price/Discount Manipulation
# VULNERABLE: Trust client-submitted price
@app.route('/checkout', methods=['POST'])
def checkout():
price = request.json['price'] # Client can set any price!
process_payment(price)
# SAFE: Calculate price server-side
@app.route('/checkout', methods=['POST'])
def checkout():
cart = get_cart(current_user.id)
price = calculate_total(cart) # Always server-calculated
process_payment(price)
# VULNERABLE: Stackable discounts without limits
def apply_discounts(cart, discount_codes):
for code in discount_codes:
discount = get_discount(code)
cart.total -= discount.amount
# Attack: Apply same code multiple times, negative total
# SAFE: Limit discount application
def apply_discounts(cart, discount_codes):
# Remove duplicates
unique_codes = set(discount_codes)
total_discount = Decimal('0')
for code in unique_codes:
if is_code_used(cart.user_id, code):
continue # Code already used
discount = get_discount(code)
total_discount += discount.amount
mark_code_used(cart.user_id, code)
# Cap discount at total
max_discount = cart.subtotal * Decimal('0.5') # Max 50% off
final_discount = min(total_discount, max_discount)
cart.total -= final_discount
5. Inventory/Resource Exhaustion
# VULNERABLE: No reservation during checkout
def checkout(cart):
for item in cart.items:
if get_stock(item.product_id) >= item.quantity:
# Stock available
pass
# Processing takes time...
process_payment()
for item in cart.items:
reduce_stock(item.product_id, item.quantity) # May oversell
# SAFE: Reserve inventory atomically
@transaction.atomic
def checkout(cart):
for item in cart.items:
product = Product.objects.select_for_update().get(id=item.product_id)
if product.stock < item.quantity:
raise InsufficientStock(product.name)
product.stock -= item.quantity # Reserve immediately
product.save()
# If payment fails, transaction rolls back
process_payment()
6. Time-Based Attacks
# VULNERABLE: Expired coupon still usable with timing attack
def apply_coupon(code):
coupon = Coupon.objects.get(code=code)
if coupon.expiry > datetime.now():
return coupon.discount
raise CouponExpired()
# SAFE: Use database time, not application time
from django.db.models.functions import Now
def apply_coupon(code):
coupon = Coupon.objects.annotate(
is_valid=Q(expiry__gt=Now())
).get(code=code)
if not coupon.is_valid:
raise CouponExpired()
return coupon.discount
7. Parameter Tampering
# VULNERABLE: Trust hidden form fields
# HTML: <input type="hidden" name="user_id" value="123">
@app.route('/update-profile', methods=['POST'])
def update_profile():
user_id = request.form['user_id'] # Attacker can change this!
User.query.get(user_id).update(...)
# SAFE: Use session-based user identification
@app.route('/update-profile', methods=['POST'])
def update_profile():
user_id = current_user.id # From authenticated session
User.query.get(user_id).update(...)
Detection Patterns
State Machine Validation
class OrderStateMachine:
VALID_TRANSITIONS = {
'draft': ['submitted'],
'submitted': ['approved', 'rejected'],
'approved': ['shipped'],
'shipped': ['delivered', 'returned'],
'delivered': ['returned'],
'rejected': [],
'returned': ['refunded'],
'refunded': []
}
def transition(self, order, new_state):
current = order.state
if new_state not in self.VALID_TRANSITIONS.get(current, []):
raise InvalidTransition(f"Cannot go from {current} to {new_state}")
order.state = new_state
log_state_change(order, current, new_state)
Idempotency
# SAFE: Idempotent operations with idempotency keys
import hashlib
def process_request(request_data, idempotency_key):
# Check if request was already processed
existing = ProcessedRequest.query.filter_by(key=idempotency_key).first()
if existing:
return existing.response # Return cached response
# Process request
result = do_processing(request_data)
# Store for future duplicate requests
ProcessedRequest.create(key=idempotency_key, response=result)
return result
Rate Limiting Business Actions
# Limit business-critical actions
from functools import wraps
import time
def rate_limit_action(action_name, limit, window):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
user_id = current_user.id
key = f"action:{action_name}:{user_id}"
count = redis.incr(key)
if count == 1:
redis.expire(key, window)
if count > limit:
raise RateLimitExceeded(f"Too many {action_name} attempts")
return f(*args, **kwargs)
return wrapper
return decorator
@rate_limit_action('password_reset', limit=3, window=3600)
def request_password_reset(email):
pass
@rate_limit_action('transfer', limit=10, window=86400)
def transfer_funds(from_account, to_account, amount):
pass
Validation Patterns
Server-Side Calculation
# Always recalculate on server
def calculate_order_total(order):
subtotal = Decimal('0')
for item in order.items:
# Get current price from database, not from request
product = Product.query.get(item.product_id)
subtotal += product.price * item.quantity
# Apply tax
tax = subtotal * get_tax_rate(order.shipping_address)
# Apply discounts (validated server-side)
discount = calculate_discounts(order, order.discount_codes)
# Calculate total
total = subtotal + tax - discount
# Sanity checks
if total < Decimal('0'):
raise InvalidOrderError("Negative total")
if discount > subtotal:
raise InvalidOrderError("Discount exceeds subtotal")
return {
'subtotal': subtotal,
'tax': tax,
'discount': discount,
'total': total
}
Business Rule Enforcement
class TransferValidator:
def validate(self, transfer):
errors = []
# Check transfer limits
if transfer.amount > MAX_SINGLE_TRANSFER:
errors.append("Exceeds single transfer limit")
# Check daily limits
daily_total = get_daily_transfer_total(transfer.from_account)
if daily_total + transfer.amount > DAILY_LIMIT:
errors.append("Exceeds daily transfer limit")
# Check velocity (unusual number of transfers)
recent_count = get_recent_transfer_count(transfer.from_account, hours=1)
if recent_count > MAX_TRANSFERS_PER_HOUR:
errors.append("Too many transfers in short period")
# Check for unusual patterns
if is_unusual_recipient(transfer.from_account, transfer.to_account):
errors.append("Unusual recipient - requires verification")
if errors:
raise ValidationError(errors)
Grep Patterns for Detection
# Race condition indicators
grep -rn "sleep\|time\.sleep\|Thread\|async" --include="*.py"
grep -rn "balance\|inventory\|stock" --include="*.py" | grep -v "select_for_update\|lock"
# Price/amount from request
grep -rn "request\.\w*\[.*price\|request\.\w*\[.*amount\|request\.\w*\[.*total" --include="*.py"
# Missing validation
grep -rn "def checkout\|def purchase\|def transfer" --include="*.py"
# Floating point for money
grep -rn "float.*price\|float.*amount\|float.*balance" --include="*.py"
Testing Checklist
- Race conditions tested (concurrent requests)
- Workflow steps enforced server-side
- State transitions validated
- Prices/totals calculated server-side
- Discount limits enforced
- Inventory checked and reserved atomically
- Integer overflow/underflow prevented
- Decimal used for financial calculations
- Time-based logic uses server/database time
- Hidden field values not trusted
- Idempotency keys for critical operations
- Rate limits on business-critical actions
- Unusual patterns detected and flagged