Files
virtual_board_member/app/models/audit_log.py
2025-08-07 16:11:14 -04:00

162 lines
6.0 KiB
Python

"""
Audit log models for the Virtual Board Member AI System.
"""
from datetime import datetime
from typing import Optional, Dict, Any
from sqlalchemy import Column, String, DateTime, Text, Integer, ForeignKey, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
import uuid
import enum
from app.core.database import Base
class AuditEventType(str, enum.Enum):
"""Audit event types."""
USER_LOGIN = "user_login"
USER_LOGOUT = "user_logout"
USER_CREATED = "user_created"
USER_UPDATED = "user_updated"
USER_DELETED = "user_deleted"
DOCUMENT_UPLOADED = "document_uploaded"
DOCUMENT_ACCESSED = "document_accessed"
DOCUMENT_DOWNLOADED = "document_downloaded"
DOCUMENT_DELETED = "document_deleted"
DOCUMENT_PROCESSED = "document_processed"
COMMITMENT_CREATED = "commitment_created"
COMMITMENT_UPDATED = "commitment_updated"
COMMITMENT_COMPLETED = "commitment_completed"
COMMITMENT_DELETED = "commitment_deleted"
QUERY_EXECUTED = "query_executed"
REPORT_GENERATED = "report_generated"
SYSTEM_CONFIGURATION_CHANGED = "system_configuration_changed"
SECURITY_EVENT = "security_event"
COMPLIANCE_EVENT = "compliance_event"
class AuditLog(Base):
"""Audit log model for compliance and security tracking."""
__tablename__ = "audit_logs"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
# Event information
event_type = Column(String(100), nullable=False, index=True)
event_description = Column(Text, nullable=True)
# User information
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True)
user_email = Column(String(255), nullable=True) # Denormalized for performance
user_role = Column(String(50), nullable=True) # Denormalized for performance
# Resource information
resource_type = Column(String(50), nullable=True) # document, commitment, user, etc.
resource_id = Column(UUID(as_uuid=True), nullable=True)
resource_name = Column(String(500), nullable=True)
# Request information
ip_address = Column(String(45), nullable=True) # IPv4 or IPv6
user_agent = Column(Text, nullable=True)
request_method = Column(String(10), nullable=True)
request_url = Column(Text, nullable=True)
request_headers = Column(JSONB, nullable=True)
# Response information
response_status_code = Column(Integer, nullable=True)
response_time_ms = Column(Integer, nullable=True)
# Additional data
event_metadata = Column(JSONB, nullable=True) # Additional event-specific data
severity = Column(String(20), default="info") # info, warning, error, critical
# Compliance fields
compliance_category = Column(String(100), nullable=True) # SOX, GDPR, etc.
data_classification = Column(String(50), nullable=True) # public, internal, confidential, restricted
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False, index=True)
# Indexes for performance
__table_args__ = (
Index('idx_audit_logs_user_event', 'user_id', 'event_type'),
Index('idx_audit_logs_resource', 'resource_type', 'resource_id'),
Index('idx_audit_logs_created_at', 'created_at'),
Index('idx_audit_logs_compliance', 'compliance_category', 'created_at'),
)
def __repr__(self):
return f"<AuditLog(id={self.id}, event_type='{self.event_type}', user_id={self.user_id})>"
@classmethod
def log_user_login(cls, user_id: str, user_email: str, user_role: str,
ip_address: str, user_agent: str, success: bool,
**kwargs) -> "AuditLog":
"""Log user login event."""
return cls(
event_type=AuditEventType.USER_LOGIN,
event_description=f"User login attempt - {'successful' if success else 'failed'}",
user_id=user_id,
user_email=user_email,
user_role=user_role,
ip_address=ip_address,
user_agent=user_agent,
severity="warning" if not success else "info",
event_metadata={"success": success, **kwargs}
)
@classmethod
def log_document_access(cls, user_id: str, user_email: str, document_id: str,
document_name: str, action: str, ip_address: str,
**kwargs) -> "AuditLog":
"""Log document access event."""
return cls(
event_type=AuditEventType.DOCUMENT_ACCESSED,
event_description=f"Document {action}: {document_name}",
user_id=user_id,
user_email=user_email,
resource_type="document",
resource_id=document_id,
resource_name=document_name,
ip_address=ip_address,
event_metadata={"action": action, **kwargs}
)
@classmethod
def log_query_execution(cls, user_id: str, user_email: str, query: str,
response_time_ms: int, result_count: int,
**kwargs) -> "AuditLog":
"""Log query execution event."""
return cls(
event_type=AuditEventType.QUERY_EXECUTED,
event_description=f"Query executed: {query[:100]}...",
user_id=user_id,
user_email=user_email,
ip_address=kwargs.get("ip_address"),
response_time_ms=response_time_ms,
event_metadata={
"query": query,
"result_count": result_count,
**kwargs
}
)
@classmethod
def log_security_event(cls, event_type: str, severity: str,
description: str, user_id: str = None,
ip_address: str = None, **kwargs) -> "AuditLog":
"""Log security event."""
return cls(
event_type=event_type,
event_description=description,
user_id=user_id,
ip_address=ip_address,
severity=severity,
event_metadata=kwargs
)