Files
HomeAudit/comprehensive_discovery_results/container_audit_results/MIGRATION_VALIDATION_TESTS.py
admin ef122ca019 Add comprehensive Future-Proof Scalability migration playbook and scripts
- Add MIGRATION_PLAYBOOK.md with detailed 4-phase migration strategy
- Add FUTURE_PROOF_SCALABILITY_PLAN.md with end-state architecture
- Add migration_scripts/ with automated migration tools:
  - Docker Swarm setup and configuration
  - Traefik v3 reverse proxy deployment
  - Service migration automation
  - Backup and validation scripts
  - Monitoring and security hardening
- Add comprehensive discovery results and audit data
- Include zero-downtime migration strategy with rollback capabilities

This provides a complete world-class migration solution for converting
from current infrastructure to Future-Proof Scalability architecture.
2025-08-24 13:18:47 -04:00

617 lines
27 KiB
Python

#!/usr/bin/env python3
"""
Container Migration Validation Tests
This script provides validation tests to ensure containers are functioning
identically after migration. It tests all critical aspects of container
operation including network connectivity, data persistence, and functionality.
"""
import json
import yaml
import subprocess
import requests
import time
import os
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional
import socket
import mysql.connector
import psycopg2
import redis
from datetime import datetime
class ContainerMigrationValidator:
def __init__(self, config_dir: str):
self.config_dir = Path(config_dir)
self.test_results = {
'timestamp': datetime.now().isoformat(),
'tests_run': 0,
'tests_passed': 0,
'tests_failed': 0,
'container_results': {},
'critical_failures': []
}
def load_container_config(self, container_name: str) -> Dict[str, Any]:
"""Load container configuration from audit files."""
config_files = list(self.config_dir.glob(f"*{container_name}_config.yaml"))
if not config_files:
raise FileNotFoundError(f"No config found for {container_name}")
with open(config_files[0], 'r') as f:
return yaml.safe_load(f)
def test_container_running(self, container_name: str) -> Dict[str, Any]:
"""Test if container is running and healthy."""
result = {
'test': 'container_running',
'container': container_name,
'status': 'UNKNOWN',
'message': '',
'details': {}
}
try:
# Check if container exists and is running
cmd_result = subprocess.run(['docker', 'ps', '--filter', f'name={container_name}', '--format', 'json'],
capture_output=True, text=True)
if cmd_result.returncode == 0:
containers = [json.loads(line) for line in cmd_result.stdout.strip().split('\n') if line]
if containers:
container = containers[0]
result['status'] = 'PASS' if container['State'] == 'running' else 'FAIL'
result['message'] = f"Container state: {container['State']}"
result['details'] = {
'state': container['State'],
'status': container.get('Status', ''),
'ports': container.get('Ports', ''),
'image': container.get('Image', '')
}
else:
result['status'] = 'FAIL'
result['message'] = 'Container not found or not running'
else:
result['status'] = 'ERROR'
result['message'] = f"Docker command failed: {cmd_result.stderr}"
except Exception as e:
result['status'] = 'ERROR'
result['message'] = f"Exception during test: {str(e)}"
return result
def test_port_connectivity(self, container_name: str, config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Test port connectivity for container services."""
results = []
ports = config.get('ports', {}).get('bindings', {})
for container_port, bindings in ports.items():
for binding in bindings:
host_port = binding.get('host_port')
host_ip = binding.get('host_ip', 'localhost')
if host_ip == '':
host_ip = 'localhost'
elif host_ip == '0.0.0.0':
host_ip = 'localhost'
result = {
'test': 'port_connectivity',
'container': container_name,
'port': f"{host_ip}:{host_port}",
'container_port': container_port,
'status': 'UNKNOWN',
'message': ''
}
try:
# Test TCP connectivity
protocol = container_port.split('/')[-1] if '/' in container_port else 'tcp'
port_num = int(container_port.split('/')[0])
host_port_num = int(host_port)
if protocol == 'tcp':
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
connection_result = sock.connect_ex((host_ip, host_port_num))
sock.close()
if connection_result == 0:
result['status'] = 'PASS'
result['message'] = 'Port is accessible'
else:
result['status'] = 'FAIL'
result['message'] = 'Port is not accessible'
else:
result['status'] = 'SKIP'
result['message'] = f'UDP port testing not implemented'
except Exception as e:
result['status'] = 'ERROR'
result['message'] = f'Error testing port: {str(e)}'
results.append(result)
return results
def test_web_service_health(self, container_name: str, config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Test web service health endpoints."""
results = []
# Known health endpoints for common services
health_endpoints = {
'nextcloud': ['/status.php', '/ocs/v1.php/apps/files_external/api/v1/mounts'],
'homeassistant': ['/api/', '/api/states'],
'portainer': ['/api/system/status'],
'jellyfin': ['/health', '/system/info/public'],
'gitea': ['/api/healthz'],
'immich': ['/api/server-info/ping'],
'paperless': ['/api/', '/api/documents/'],
'adguardhome': ['/control/status'],
'vaultwarden': ['/alive'],
'n8n': ['/healthz'],
'uptime-kuma': ['/api/status-page'],
'dozzle': ['/api/logs'],
'code-server': ['/healthz']
}
service_name = container_name.lower().replace('-', '').replace('_', '')
endpoints = []
# Find matching health endpoints
for service, service_endpoints in health_endpoints.items():
if service in service_name or service_name.startswith(service):
endpoints = service_endpoints
break
if not endpoints:
# Generic health endpoints
endpoints = ['/', '/health', '/api/health', '/status', '/ping']
ports = config.get('ports', {}).get('bindings', {})
for container_port, bindings in ports.items():
for binding in bindings:
host_port = binding.get('host_port')
host_ip = binding.get('host_ip', 'localhost')
if host_ip == '':
host_ip = 'localhost'
elif host_ip == '0.0.0.0':
host_ip = 'localhost'
# Determine if this is likely a web service port
port_num = int(container_port.split('/')[0])
if port_num in [80, 443, 8080, 8443] or port_num > 3000:
for endpoint in endpoints:
result = {
'test': 'web_service_health',
'container': container_name,
'url': f"http://{host_ip}:{host_port}{endpoint}",
'status': 'UNKNOWN',
'message': '',
'response_time': None,
'status_code': None
}
try:
start_time = time.time()
response = requests.get(result['url'], timeout=10, allow_redirects=True)
response_time = time.time() - start_time
result['response_time'] = round(response_time, 3)
result['status_code'] = response.status_code
if response.status_code < 400:
result['status'] = 'PASS'
result['message'] = f'Service responding (HTTP {response.status_code})'
break # Service is responding, no need to test other endpoints
else:
result['status'] = 'WARN'
result['message'] = f'Service returned HTTP {response.status_code}'
except requests.exceptions.ConnectionError:
result['status'] = 'FAIL'
result['message'] = 'Connection refused or service not responding'
except requests.exceptions.Timeout:
result['status'] = 'FAIL'
result['message'] = 'Request timeout'
except Exception as e:
result['status'] = 'ERROR'
result['message'] = f'Error testing endpoint: {str(e)}'
results.append(result)
# If we got a successful response, break endpoint loop
if result['status'] == 'PASS':
break
return results
def test_volume_mounts(self, container_name: str, config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Test volume mount accessibility and data persistence."""
results = []
mounts = config.get('volumes', {}).get('detailed_mounts', [])
for mount in mounts:
source = mount.get('source')
destination = mount.get('destination')
if not source or source.startswith('/var/lib/docker'):
continue # Skip Docker internal volumes
result = {
'test': 'volume_mount',
'container': container_name,
'source': source,
'destination': destination,
'status': 'UNKNOWN',
'message': '',
'details': {}
}
try:
# Check if source directory exists
if os.path.exists(source):
result['details']['source_exists'] = True
# Check if it's readable
if os.access(source, os.R_OK):
result['details']['source_readable'] = True
else:
result['details']['source_readable'] = False
# Check directory size if it's a directory
if os.path.isdir(source):
try:
dir_size = sum(os.path.getsize(os.path.join(dirpath, filename))
for dirpath, dirnames, filenames in os.walk(source)
for filename in filenames)
result['details']['size_bytes'] = dir_size
except:
result['details']['size_bytes'] = 'unknown'
# Test if mount is active in container
try:
mount_check = subprocess.run([
'docker', 'exec', container_name, 'test', '-d', destination
], capture_output=True)
if mount_check.returncode == 0:
result['status'] = 'PASS'
result['message'] = 'Volume mount is accessible'
else:
result['status'] = 'WARN'
result['message'] = 'Mount point not accessible in container'
except:
result['status'] = 'WARN'
result['message'] = 'Could not verify mount in container'
else:
result['status'] = 'FAIL'
result['message'] = 'Source directory does not exist'
result['details']['source_exists'] = False
except Exception as e:
result['status'] = 'ERROR'
result['message'] = f'Error testing volume mount: {str(e)}'
results.append(result)
return results
def test_database_connectivity(self, container_name: str, config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Test database connectivity for database containers."""
results = []
# Identify database containers by image or environment variables
image = config.get('image', {}).get('tag', '').lower()
env = config.get('environment', {})
database_tests = []
# MySQL/MariaDB
if 'mysql' in image or 'mariadb' in image or 'MYSQL_' in str(env):
ports = config.get('ports', {}).get('bindings', {})
for container_port, bindings in ports.items():
if '3306' in container_port:
for binding in bindings:
database_tests.append({
'type': 'mysql',
'host': binding.get('host_ip', 'localhost') or 'localhost',
'port': int(binding.get('host_port')),
'user': env.get('MYSQL_USER', 'root'),
'password': env.get('MYSQL_PASSWORD', env.get('MYSQL_ROOT_PASSWORD', '')),
'database': env.get('MYSQL_DATABASE', 'mysql')
})
# PostgreSQL
if 'postgres' in image or 'POSTGRES_' in str(env):
ports = config.get('ports', {}).get('bindings', {})
for container_port, bindings in ports.items():
if '5432' in container_port:
for binding in bindings:
database_tests.append({
'type': 'postgresql',
'host': binding.get('host_ip', 'localhost') or 'localhost',
'port': int(binding.get('host_port')),
'user': env.get('POSTGRES_USER', 'postgres'),
'password': env.get('POSTGRES_PASSWORD', ''),
'database': env.get('POSTGRES_DB', 'postgres')
})
# Redis
if 'redis' in image or 'valkey' in image:
ports = config.get('ports', {}).get('bindings', {})
for container_port, bindings in ports.items():
if '6379' in container_port:
for binding in bindings:
database_tests.append({
'type': 'redis',
'host': binding.get('host_ip', 'localhost') or 'localhost',
'port': int(binding.get('host_port')),
'password': env.get('REDIS_PASSWORD', '')
})
# Perform database connectivity tests
for db_test in database_tests:
result = {
'test': 'database_connectivity',
'container': container_name,
'database_type': db_test['type'],
'connection_string': f"{db_test['type']}://{db_test['host']}:{db_test['port']}",
'status': 'UNKNOWN',
'message': ''
}
try:
if db_test['type'] == 'mysql':
# Extract password safely (might be masked)
password = db_test['password']
if '***' in password:
result['status'] = 'SKIP'
result['message'] = 'Password is masked, cannot test connectivity'
else:
conn = mysql.connector.connect(
host=db_test['host'],
port=db_test['port'],
user=db_test['user'],
password=password,
database=db_test['database'],
connection_timeout=5
)
conn.close()
result['status'] = 'PASS'
result['message'] = 'Database connection successful'
elif db_test['type'] == 'postgresql':
password = db_test['password']
if '***' in password:
result['status'] = 'SKIP'
result['message'] = 'Password is masked, cannot test connectivity'
else:
conn = psycopg2.connect(
host=db_test['host'],
port=db_test['port'],
user=db_test['user'],
password=password,
database=db_test['database'],
connect_timeout=5
)
conn.close()
result['status'] = 'PASS'
result['message'] = 'Database connection successful'
elif db_test['type'] == 'redis':
r = redis.Redis(
host=db_test['host'],
port=db_test['port'],
password=db_test.get('password') if db_test.get('password') else None,
socket_timeout=5
)
r.ping()
result['status'] = 'PASS'
result['message'] = 'Redis connection successful'
except Exception as e:
result['status'] = 'FAIL'
result['message'] = f'Database connection failed: {str(e)}'
results.append(result)
return results
def test_device_access(self, container_name: str, config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Test device access for containers with device mappings."""
results = []
devices = config.get('devices', {}).get('devices', [])
for device in devices:
host_path = device.get('PathOnHost')
container_path = device.get('PathInContainer')
permissions = device.get('CgroupPermissions', 'rwm')
result = {
'test': 'device_access',
'container': container_name,
'host_device': host_path,
'container_device': container_path,
'permissions': permissions,
'status': 'UNKNOWN',
'message': ''
}
try:
# Check if device exists on host
if os.path.exists(host_path):
result['host_device_exists'] = True
# Check if device is accessible in container
device_check = subprocess.run([
'docker', 'exec', container_name, 'test', '-e', container_path
], capture_output=True)
if device_check.returncode == 0:
result['status'] = 'PASS'
result['message'] = 'Device is accessible in container'
else:
result['status'] = 'FAIL'
result['message'] = 'Device not accessible in container'
else:
result['status'] = 'FAIL'
result['message'] = 'Device does not exist on host'
result['host_device_exists'] = False
except Exception as e:
result['status'] = 'ERROR'
result['message'] = f'Error testing device access: {str(e)}'
results.append(result)
return results
def validate_container(self, container_name: str) -> Dict[str, Any]:
"""Run comprehensive validation for a single container."""
print(f"🧪 Testing container: {container_name}")
try:
config = self.load_container_config(container_name)
except FileNotFoundError:
return {
'container': container_name,
'status': 'ERROR',
'message': 'Container configuration not found',
'tests': []
}
all_tests = []
# Test 1: Container running status
print(f" ✓ Testing container status...")
running_test = self.test_container_running(container_name)
all_tests.append(running_test)
# Test 2: Port connectivity
print(f" ✓ Testing port connectivity...")
port_tests = self.test_port_connectivity(container_name, config)
all_tests.extend(port_tests)
# Test 3: Web service health
print(f" ✓ Testing web service health...")
web_tests = self.test_web_service_health(container_name, config)
all_tests.extend(web_tests)
# Test 4: Volume mounts
print(f" ✓ Testing volume mounts...")
volume_tests = self.test_volume_mounts(container_name, config)
all_tests.extend(volume_tests)
# Test 5: Database connectivity
print(f" ✓ Testing database connectivity...")
db_tests = self.test_database_connectivity(container_name, config)
all_tests.extend(db_tests)
# Test 6: Device access
print(f" ✓ Testing device access...")
device_tests = self.test_device_access(container_name, config)
all_tests.extend(device_tests)
# Summarize results
passed = sum(1 for t in all_tests if t['status'] == 'PASS')
failed = sum(1 for t in all_tests if t['status'] == 'FAIL')
errors = sum(1 for t in all_tests if t['status'] == 'ERROR')
overall_status = 'PASS' if failed == 0 and errors == 0 else 'FAIL' if failed > 0 else 'ERROR'
return {
'container': container_name,
'status': overall_status,
'tests_run': len(all_tests),
'tests_passed': passed,
'tests_failed': failed,
'tests_error': errors,
'tests': all_tests
}
def run_all_validations(self, container_names: Optional[List[str]] = None) -> Dict[str, Any]:
"""Run validation tests for all containers or specified containers."""
if container_names is None:
# Find all container config files
config_files = list(self.config_dir.glob("*_config.yaml"))
container_names = []
for config_file in config_files:
# Extract container name from filename
parts = config_file.stem.split('_')
if len(parts) >= 3: # host_timestamp_containername_config
container_name = '_'.join(parts[2:-1]) # Remove host, timestamp, and 'config'
container_names.append(container_name)
print(f"🚀 Starting validation tests for {len(container_names)} containers...")
print("=" * 60)
for container_name in container_names:
result = self.validate_container(container_name)
self.test_results['container_results'][container_name] = result
self.test_results['tests_run'] += result['tests_run']
self.test_results['tests_passed'] += result['tests_passed']
self.test_results['tests_failed'] += result['tests_failed']
if result['status'] == 'FAIL':
self.test_results['critical_failures'].append({
'container': container_name,
'failed_tests': [t for t in result['tests'] if t['status'] == 'FAIL']
})
print(f" 📊 {container_name}: {result['status']} ({result['tests_passed']}/{result['tests_run']} passed)")
print("\n" + "=" * 60)
print(f"🏁 Validation Complete!")
print(f"📊 Total Tests: {self.test_results['tests_run']}")
print(f"✅ Passed: {self.test_results['tests_passed']}")
print(f"❌ Failed: {self.test_results['tests_failed']}")
print(f"🚨 Critical Failures: {len(self.test_results['critical_failures'])}")
return self.test_results
def save_results(self, output_file: str) -> None:
"""Save validation results to file."""
with open(output_file, 'w') as f:
yaml.dump(self.test_results, f, default_flow_style=False, sort_keys=False)
print(f"📄 Results saved to: {output_file}")
def main():
if len(sys.argv) < 2:
print("Usage: python3 MIGRATION_VALIDATION_TESTS.py <config_directory> [container_names...]")
print("\nExample:")
print(" python3 MIGRATION_VALIDATION_TESTS.py individual_configs/")
print(" python3 MIGRATION_VALIDATION_TESTS.py individual_configs/ nextcloud homeassistant")
sys.exit(1)
config_dir = sys.argv[1]
container_names = sys.argv[2:] if len(sys.argv) > 2 else None
validator = ContainerMigrationValidator(config_dir)
results = validator.run_all_validations(container_names)
# Save results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
results_file = f"migration_validation_results_{timestamp}.yaml"
validator.save_results(results_file)
# Exit with error code if there are critical failures
if results['critical_failures']:
print(f"\n🚨 WARNING: {len(results['critical_failures'])} containers have critical failures!")
for failure in results['critical_failures']:
print(f" - {failure['container']}: {len(failure['failed_tests'])} failed tests")
sys.exit(1)
if __name__ == "__main__":
main()