Files
HomeAudit/comprehensive_discovery_results/extract_container_data.py
admin ef122ca019 Add comprehensive Future-Proof Scalability migration playbook and scripts
- Add MIGRATION_PLAYBOOK.md with detailed 4-phase migration strategy
- Add FUTURE_PROOF_SCALABILITY_PLAN.md with end-state architecture
- Add migration_scripts/ with automated migration tools:
  - Docker Swarm setup and configuration
  - Traefik v3 reverse proxy deployment
  - Service migration automation
  - Backup and validation scripts
  - Monitoring and security hardening
- Add comprehensive discovery results and audit data
- Include zero-downtime migration strategy with rollback capabilities

This provides a complete world-class migration solution for converting
from current infrastructure to Future-Proof Scalability architecture.
2025-08-24 13:18:47 -04:00

322 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Container Data Extraction Script
Parses all container JSON files and generates structured migration data
"""
import json
import os
import yaml
from pathlib import Path
from collections import defaultdict
def extract_container_info(container_data):
"""Extract key migration-relevant info from container JSON"""
container = container_data[0] if isinstance(container_data, list) else container_data
info = {
'name': container.get('Name', '').lstrip('/'),
'id': container.get('Id', '')[:12], # Short ID
'image': container.get('Config', {}).get('Image', ''),
'state': container.get('State', {}).get('Status', ''),
'created': container.get('Created', ''),
'ports': {},
'volumes': [],
'bind_mounts': [],
'environment': container.get('Config', {}).get('Env', []),
'network_mode': container.get('HostConfig', {}).get('NetworkMode', ''),
'networks': list(container.get('NetworkSettings', {}).get('Networks', {}).keys()),
'restart_policy': container.get('HostConfig', {}).get('RestartPolicy', {}).get('Name', ''),
'health_check': None,
'labels': container.get('Config', {}).get('Labels', {}),
'depends_on': [],
'resource_limits': {
'memory': container.get('HostConfig', {}).get('Memory', 0),
'cpu_shares': container.get('HostConfig', {}).get('CpuShares', 0),
'nano_cpus': container.get('HostConfig', {}).get('NanoCpus', 0)
}
}
# Extract port mappings
port_bindings = container.get('HostConfig', {}).get('PortBindings', {})
for container_port, host_bindings in port_bindings.items():
if host_bindings:
info['ports'][container_port] = [
f"{binding.get('HostIp', '0.0.0.0')}:{binding.get('HostPort', '')}"
for binding in host_bindings
]
# Extract volumes and mounts
for mount in container.get('Mounts', []):
if mount.get('Type') == 'volume':
info['volumes'].append({
'name': mount.get('Name'),
'source': mount.get('Source'),
'destination': mount.get('Destination'),
'read_write': mount.get('RW', True)
})
elif mount.get('Type') == 'bind':
info['bind_mounts'].append({
'source': mount.get('Source'),
'destination': mount.get('Destination'),
'read_write': mount.get('RW', True)
})
# Extract health check
health_check = container.get('Config', {}).get('Healthcheck', {})
if health_check.get('Test'):
info['health_check'] = {
'test': health_check.get('Test'),
'interval': health_check.get('Interval', 0) // 1000000000, # Convert to seconds
'timeout': health_check.get('Timeout', 0) // 1000000000,
'retries': health_check.get('Retries', 0)
}
# Extract dependencies from compose labels
compose_depends = info['labels'].get('com.docker.compose.depends_on', '')
if compose_depends:
# Parse format like "nextcloud-redis:service_started:false,nextcloud-db:service_started:false"
deps = [dep.split(':')[0] for dep in compose_depends.split(',') if ':' in dep]
info['depends_on'] = deps
return info
def categorize_service(container_name, image, labels):
"""Categorize service based on container name, image, and labels"""
name_lower = container_name.lower()
image_lower = image.lower()
# Media services
if any(x in name_lower for x in ['jellyfin', 'plex', 'emby']):
return 'media_streaming'
if any(x in name_lower for x in ['sonarr', 'radarr', 'lidarr', 'bazarr']):
return 'media_management'
# Infrastructure
if any(x in name_lower for x in ['adguard', 'pihole', 'unbound']):
return 'dns_dhcp'
if any(x in name_lower for x in ['traefik', 'nginx', 'caddy']):
return 'reverse_proxy'
# Productivity
if any(x in name_lower for x in ['nextcloud', 'owncloud']):
return 'cloud_storage'
if any(x in name_lower for x in ['gitea', 'gitlab', 'forgejo']):
return 'code_repository'
if any(x in name_lower for x in ['paperless', 'docusaurus']):
return 'document_management'
# Home Automation
if any(x in name_lower for x in ['homeassistant', 'home-assistant', 'hass']):
return 'home_automation'
if any(x in name_lower for x in ['esphome', 'zigbee', 'zwave']):
return 'iot_management'
if any(x in name_lower for x in ['mosquitto', 'mqtt']):
return 'messaging_broker'
# Databases
if any(x in image_lower for x in ['postgres', 'mysql', 'mariadb', 'redis', 'mongo']):
return 'database'
# Monitoring
if any(x in name_lower for x in ['portainer', 'watchtower', 'uptime', 'grafana', 'prometheus']):
return 'monitoring'
# Development
if any(x in name_lower for x in ['code-server', 'jupyter', 'appflowy']):
return 'development'
# Photo Management
if any(x in name_lower for x in ['immich', 'photoprism']):
return 'photo_management'
return 'other'
def determine_migration_complexity(category, volumes, bind_mounts, depends_on):
"""Determine migration complexity based on service characteristics"""
complexity_score = 0
# Base complexity by category
high_complexity_categories = [
'database', 'home_automation', 'photo_management',
'cloud_storage', 'media_streaming'
]
if category in high_complexity_categories:
complexity_score += 3
elif category in ['code_repository', 'document_management']:
complexity_score += 2
else:
complexity_score += 1
# Volume complexity
if len(volumes) > 2:
complexity_score += 2
elif len(volumes) > 0:
complexity_score += 1
# Bind mount complexity
if len(bind_mounts) > 3:
complexity_score += 2
elif len(bind_mounts) > 0:
complexity_score += 1
# Dependency complexity
if len(depends_on) > 2:
complexity_score += 2
elif len(depends_on) > 0:
complexity_score += 1
if complexity_score >= 7:
return 'very_high'
elif complexity_score >= 5:
return 'high'
elif complexity_score >= 3:
return 'medium'
else:
return 'low'
def main():
base_path = Path('.')
container_files = list(base_path.glob('**/container_*.json'))
device_containers = defaultdict(list)
all_services = {}
category_stats = defaultdict(int)
print(f"Found {len(container_files)} container files")
for file_path in container_files:
# Extract device name from path
device_name = None
for part in file_path.parts:
if part.startswith('system_audit_'):
device_name = part.replace('system_audit_', '').replace('_20250823_214938', '').replace('_20250824_112825', '').replace('_20250824_112818', '').replace('_20250824_022721', '').replace('_20250823_222648', '')
break
if not device_name:
device_name = 'unknown'
try:
with open(file_path, 'r') as f:
container_data = json.load(f)
container_info = extract_container_info(container_data)
category = categorize_service(
container_info['name'],
container_info['image'],
container_info['labels']
)
complexity = determine_migration_complexity(
category,
container_info['volumes'],
container_info['bind_mounts'],
container_info['depends_on']
)
service_entry = {
'device': device_name,
'category': category,
'migration_complexity': complexity,
**container_info
}
device_containers[device_name].append(service_entry)
all_services[f"{device_name}_{container_info['name']}"] = service_entry
category_stats[category] += 1
except Exception as e:
print(f"Error processing {file_path}: {e}")
# Generate comprehensive migration data
migration_data = {
'summary': {
'total_containers': len(all_services),
'devices': len(device_containers),
'categories': dict(category_stats)
},
'devices': dict(device_containers),
'by_category': defaultdict(list)
}
# Group by category for migration planning
for service_id, service in all_services.items():
migration_data['by_category'][service['category']].append({
'device': service['device'],
'name': service['name'],
'image': service['image'],
'complexity': service['migration_complexity'],
'ports': service['ports'],
'volumes': len(service['volumes']),
'bind_mounts': len(service['bind_mounts']),
'dependencies': service['depends_on']
})
# Convert defaultdict to regular dict for YAML output
migration_data['by_category'] = dict(migration_data['by_category'])
# Write detailed container inventory
with open('detailed_container_inventory.yaml', 'w') as f:
yaml.dump(migration_data, f, default_flow_style=False, sort_keys=True)
# Generate migration priority summary
priority_summary = {
'critical_first': [],
'high_complexity': [],
'database_services': [],
'standalone_services': []
}
for service_id, service in all_services.items():
if service['category'] in ['dns_dhcp', 'home_automation']:
priority_summary['critical_first'].append({
'device': service['device'],
'name': service['name'],
'category': service['category']
})
if service['migration_complexity'] in ['high', 'very_high']:
priority_summary['high_complexity'].append({
'device': service['device'],
'name': service['name'],
'complexity': service['migration_complexity'],
'volumes': len(service['volumes']),
'dependencies': len(service['depends_on'])
})
if service['category'] == 'database':
priority_summary['database_services'].append({
'device': service['device'],
'name': service['name'],
'image': service['image']
})
if not service['depends_on'] and service['migration_complexity'] in ['low', 'medium']:
priority_summary['standalone_services'].append({
'device': service['device'],
'name': service['name'],
'category': service['category']
})
with open('migration_priority_summary.yaml', 'w') as f:
yaml.dump(priority_summary, f, default_flow_style=False, sort_keys=True)
print(f"✓ Processed {len(all_services)} containers across {len(device_containers)} devices")
print(f"✓ Generated detailed_container_inventory.yaml")
print(f"✓ Generated migration_priority_summary.yaml")
# Print summary statistics
print("\nCategory Distribution:")
for category, count in sorted(category_stats.items()):
print(f" {category}: {count}")
complexity_stats = defaultdict(int)
for service in all_services.values():
complexity_stats[service['migration_complexity']] += 1
print("\nComplexity Distribution:")
for complexity, count in sorted(complexity_stats.items()):
print(f" {complexity}: {count}")
if __name__ == '__main__':
main()