- Add MIGRATION_PLAYBOOK.md with detailed 4-phase migration strategy - Add FUTURE_PROOF_SCALABILITY_PLAN.md with end-state architecture - Add migration_scripts/ with automated migration tools: - Docker Swarm setup and configuration - Traefik v3 reverse proxy deployment - Service migration automation - Backup and validation scripts - Monitoring and security hardening - Add comprehensive discovery results and audit data - Include zero-downtime migration strategy with rollback capabilities This provides a complete world-class migration solution for converting from current infrastructure to Future-Proof Scalability architecture.
750 lines
33 KiB
Python
750 lines
33 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Comprehensive Container Configuration Audit Tool
|
|
|
|
This tool extracts ALL container configuration details necessary for identical recreation.
|
|
It generates complete documentation, Docker Compose templates, and migration guides.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import yaml
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
from collections import defaultdict
|
|
|
|
class ContainerConfigurationAuditor:
|
|
def __init__(self, discovery_root: str):
|
|
self.discovery_root = Path(discovery_root)
|
|
self.containers = {}
|
|
self.compose_files = {}
|
|
self.networks = {}
|
|
self.volumes = {}
|
|
self.audit_results = {
|
|
'container_inventory': {},
|
|
'compose_templates': {},
|
|
'configuration_gaps': [],
|
|
'migration_checklist': {},
|
|
'security_configurations': {},
|
|
'network_configurations': {},
|
|
'volume_configurations': {},
|
|
'device_mappings': {},
|
|
'privileged_containers': [],
|
|
'custom_settings': {}
|
|
}
|
|
|
|
def discover_container_files(self) -> List[Path]:
|
|
"""Find all container JSON files in the discovery data."""
|
|
container_files = []
|
|
for path in self.discovery_root.rglob("container_*.json"):
|
|
container_files.append(path)
|
|
return container_files
|
|
|
|
def discover_compose_files(self) -> List[Path]:
|
|
"""Find all Docker Compose files in the discovery data."""
|
|
compose_files = []
|
|
for path in self.discovery_root.rglob("compose_file_*.yml"):
|
|
compose_files.append(path)
|
|
return compose_files
|
|
|
|
def extract_container_config(self, container_file: Path) -> Dict[str, Any]:
|
|
"""Extract comprehensive configuration from a container JSON file."""
|
|
try:
|
|
with open(container_file, 'r') as f:
|
|
container_data = json.load(f)
|
|
|
|
if not isinstance(container_data, list) or len(container_data) == 0:
|
|
return None
|
|
|
|
container = container_data[0] # Docker inspect returns array
|
|
|
|
config = {
|
|
'source_file': str(container_file),
|
|
'host_system': self._extract_host_from_path(container_file),
|
|
'container_id': container.get('Id', ''),
|
|
'name': container.get('Name', '').lstrip('/'),
|
|
'created': container.get('Created', ''),
|
|
|
|
# Image Information
|
|
'image': {
|
|
'tag': container.get('Config', {}).get('Image', ''),
|
|
'sha': container.get('Image', ''),
|
|
'platform': container.get('Platform', 'linux')
|
|
},
|
|
|
|
# Runtime Configuration
|
|
'runtime': {
|
|
'restart_policy': container.get('HostConfig', {}).get('RestartPolicy', {}),
|
|
'privileged': container.get('HostConfig', {}).get('Privileged', False),
|
|
'network_mode': container.get('HostConfig', {}).get('NetworkMode', ''),
|
|
'pid_mode': container.get('HostConfig', {}).get('PidMode', ''),
|
|
'ipc_mode': container.get('HostConfig', {}).get('IpcMode', ''),
|
|
'uts_mode': container.get('HostConfig', {}).get('UTSMode', ''),
|
|
'user_ns_mode': container.get('HostConfig', {}).get('UsernsMode', ''),
|
|
'cgroup_ns_mode': container.get('HostConfig', {}).get('CgroupnsMode', ''),
|
|
'auto_remove': container.get('HostConfig', {}).get('AutoRemove', False)
|
|
},
|
|
|
|
# Environment Variables
|
|
'environment': self._extract_environment_vars(container),
|
|
|
|
# Port Mappings
|
|
'ports': self._extract_port_mappings(container),
|
|
|
|
# Volume Mounts
|
|
'volumes': self._extract_volume_mounts(container),
|
|
|
|
# Network Settings
|
|
'networks': self._extract_network_settings(container),
|
|
|
|
# Resource Limits
|
|
'resources': self._extract_resource_limits(container),
|
|
|
|
# Security Settings
|
|
'security': self._extract_security_settings(container),
|
|
|
|
# Device Mappings
|
|
'devices': self._extract_device_mappings(container),
|
|
|
|
# Command and Entrypoint
|
|
'execution': {
|
|
'entrypoint': container.get('Config', {}).get('Entrypoint'),
|
|
'cmd': container.get('Config', {}).get('Cmd'),
|
|
'working_dir': container.get('Config', {}).get('WorkingDir'),
|
|
'user': container.get('Config', {}).get('User'),
|
|
'stop_signal': container.get('Config', {}).get('StopSignal')
|
|
},
|
|
|
|
# Labels and Metadata
|
|
'labels': container.get('Config', {}).get('Labels', {}),
|
|
'compose_metadata': self._extract_compose_metadata(container)
|
|
}
|
|
|
|
return config
|
|
|
|
except Exception as e:
|
|
print(f"Error processing {container_file}: {e}")
|
|
return None
|
|
|
|
def _extract_host_from_path(self, path: Path) -> str:
|
|
"""Extract host system name from file path."""
|
|
parts = str(path).split('/')
|
|
for part in parts:
|
|
if part.startswith('system_audit_'):
|
|
return part.replace('system_audit_', '').replace('_' + part.split('_')[-1], '')
|
|
return 'unknown'
|
|
|
|
def _extract_environment_vars(self, container: Dict) -> Dict[str, str]:
|
|
"""Extract environment variables with special handling for sensitive data."""
|
|
env_list = container.get('Config', {}).get('Env', [])
|
|
env_dict = {}
|
|
|
|
for env_var in env_list:
|
|
if '=' in env_var:
|
|
key, value = env_var.split('=', 1)
|
|
# Mark sensitive variables
|
|
if any(sensitive in key.upper() for sensitive in ['PASSWORD', 'SECRET', 'KEY', 'TOKEN', 'PASS']):
|
|
env_dict[key] = f"***SENSITIVE_VALUE*** ({value[:4]}...)" if len(value) > 4 else "***SENSITIVE***"
|
|
else:
|
|
env_dict[key] = value
|
|
else:
|
|
env_dict[env_var] = ""
|
|
|
|
return env_dict
|
|
|
|
def _extract_port_mappings(self, container: Dict) -> Dict[str, Any]:
|
|
"""Extract port mappings and exposed ports."""
|
|
port_bindings = container.get('HostConfig', {}).get('PortBindings', {})
|
|
exposed_ports = container.get('Config', {}).get('ExposedPorts', {})
|
|
network_ports = container.get('NetworkSettings', {}).get('Ports', {})
|
|
|
|
ports = {
|
|
'exposed': list(exposed_ports.keys()) if exposed_ports else [],
|
|
'bindings': {},
|
|
'published': {}
|
|
}
|
|
|
|
# Process port bindings
|
|
for container_port, bindings in port_bindings.items():
|
|
if bindings:
|
|
ports['bindings'][container_port] = [
|
|
{
|
|
'host_ip': binding.get('HostIp', '0.0.0.0'),
|
|
'host_port': binding.get('HostPort')
|
|
} for binding in bindings
|
|
]
|
|
|
|
# Process published ports from network settings
|
|
for container_port, bindings in network_ports.items():
|
|
if bindings:
|
|
ports['published'][container_port] = [
|
|
{
|
|
'host_ip': binding.get('HostIp', '0.0.0.0'),
|
|
'host_port': binding.get('HostPort')
|
|
} for binding in bindings
|
|
]
|
|
|
|
return ports
|
|
|
|
def _extract_volume_mounts(self, container: Dict) -> List[Dict[str, Any]]:
|
|
"""Extract volume mounts with full details."""
|
|
mounts = container.get('Mounts', [])
|
|
binds = container.get('HostConfig', {}).get('Binds', [])
|
|
|
|
volumes = []
|
|
|
|
# Process mounts from Mounts section (most detailed)
|
|
for mount in mounts:
|
|
volume = {
|
|
'type': mount.get('Type'),
|
|
'source': mount.get('Source'),
|
|
'destination': mount.get('Destination'),
|
|
'mode': mount.get('Mode'),
|
|
'rw': mount.get('RW'),
|
|
'propagation': mount.get('Propagation'),
|
|
'driver': mount.get('Driver'),
|
|
'name': mount.get('Name')
|
|
}
|
|
volumes.append(volume)
|
|
|
|
# Also capture bind mount strings for verification
|
|
bind_strings = binds if binds else []
|
|
|
|
return {
|
|
'detailed_mounts': volumes,
|
|
'bind_strings': bind_strings
|
|
}
|
|
|
|
def _extract_network_settings(self, container: Dict) -> Dict[str, Any]:
|
|
"""Extract comprehensive network configuration."""
|
|
networks = container.get('NetworkSettings', {}).get('Networks', {})
|
|
host_config = container.get('HostConfig', {})
|
|
|
|
network_config = {
|
|
'networks': {},
|
|
'dns': {
|
|
'nameservers': host_config.get('Dns', []),
|
|
'search_domains': host_config.get('DnsSearch', []),
|
|
'options': host_config.get('DnsOptions', [])
|
|
},
|
|
'extra_hosts': host_config.get('ExtraHosts', []),
|
|
'links': host_config.get('Links', []),
|
|
'publish_all_ports': host_config.get('PublishAllPorts', False)
|
|
}
|
|
|
|
# Process each network attachment
|
|
for network_name, network_info in networks.items():
|
|
network_config['networks'][network_name] = {
|
|
'ip_address': network_info.get('IPAddress'),
|
|
'ip_prefix_len': network_info.get('IPPrefixLen'),
|
|
'gateway': network_info.get('Gateway'),
|
|
'mac_address': network_info.get('MacAddress'),
|
|
'network_id': network_info.get('NetworkID'),
|
|
'endpoint_id': network_info.get('EndpointID'),
|
|
'aliases': network_info.get('Aliases', []),
|
|
'dns_names': network_info.get('DNSNames', []),
|
|
'ipv6_gateway': network_info.get('IPv6Gateway'),
|
|
'global_ipv6_address': network_info.get('GlobalIPv6Address'),
|
|
'ipam_config': network_info.get('IPAMConfig', {})
|
|
}
|
|
|
|
return network_config
|
|
|
|
def _extract_resource_limits(self, container: Dict) -> Dict[str, Any]:
|
|
"""Extract resource limits and constraints."""
|
|
host_config = container.get('HostConfig', {})
|
|
|
|
return {
|
|
'cpu': {
|
|
'shares': host_config.get('CpuShares', 0),
|
|
'period': host_config.get('CpuPeriod', 0),
|
|
'quota': host_config.get('CpuQuota', 0),
|
|
'realtime_period': host_config.get('CpuRealtimePeriod', 0),
|
|
'realtime_runtime': host_config.get('CpuRealtimeRuntime', 0),
|
|
'cpuset_cpus': host_config.get('CpusetCpus', ''),
|
|
'cpuset_mems': host_config.get('CpusetMems', ''),
|
|
'count': host_config.get('CpuCount', 0),
|
|
'percent': host_config.get('CpuPercent', 0)
|
|
},
|
|
'memory': {
|
|
'limit': host_config.get('Memory', 0),
|
|
'reservation': host_config.get('MemoryReservation', 0),
|
|
'swap': host_config.get('MemorySwap', 0),
|
|
'swappiness': host_config.get('MemorySwappiness'),
|
|
'oom_kill_disable': host_config.get('OomKillDisable')
|
|
},
|
|
'blkio': {
|
|
'weight': host_config.get('BlkioWeight', 0),
|
|
'weight_device': host_config.get('BlkioWeightDevice'),
|
|
'device_read_bps': host_config.get('BlkioDeviceReadBps'),
|
|
'device_write_bps': host_config.get('BlkioDeviceWriteBps'),
|
|
'device_read_iops': host_config.get('BlkioDeviceReadIOps'),
|
|
'device_write_iops': host_config.get('BlkioDeviceWriteIOps')
|
|
},
|
|
'io': {
|
|
'maximum_iops': host_config.get('IOMaximumIOps', 0),
|
|
'maximum_bandwidth': host_config.get('IOMaximumBandwidth', 0)
|
|
},
|
|
'pids_limit': host_config.get('PidsLimit'),
|
|
'ulimits': host_config.get('Ulimits'),
|
|
'shm_size': host_config.get('ShmSize', 67108864)
|
|
}
|
|
|
|
def _extract_security_settings(self, container: Dict) -> Dict[str, Any]:
|
|
"""Extract security-related settings."""
|
|
host_config = container.get('HostConfig', {})
|
|
|
|
return {
|
|
'apparmor_profile': container.get('AppArmorProfile'),
|
|
'security_opt': host_config.get('SecurityOpt', []),
|
|
'cap_add': host_config.get('CapAdd', []),
|
|
'cap_drop': host_config.get('CapDrop', []),
|
|
'group_add': host_config.get('GroupAdd', []),
|
|
'readonly_rootfs': host_config.get('ReadonlyRootfs', False),
|
|
'masked_paths': host_config.get('MaskedPaths', []),
|
|
'readonly_paths': host_config.get('ReadonlyPaths', []),
|
|
'no_new_privileges': host_config.get('NoNewPrivileges', False),
|
|
'oom_score_adj': host_config.get('OomScoreAdj', 0),
|
|
'runtime': host_config.get('Runtime', 'runc'),
|
|
'isolation': host_config.get('Isolation', ''),
|
|
'cgroup': host_config.get('Cgroup', ''),
|
|
'cgroup_parent': host_config.get('CgroupParent', '')
|
|
}
|
|
|
|
def _extract_device_mappings(self, container: Dict) -> List[Dict[str, Any]]:
|
|
"""Extract device mappings and hardware access."""
|
|
devices = container.get('HostConfig', {}).get('Devices', [])
|
|
device_requests = container.get('HostConfig', {}).get('DeviceRequests', [])
|
|
device_cgroup_rules = container.get('HostConfig', {}).get('DeviceCgroupRules', [])
|
|
|
|
return {
|
|
'devices': devices or [],
|
|
'device_requests': device_requests or [],
|
|
'device_cgroup_rules': device_cgroup_rules or []
|
|
}
|
|
|
|
def _extract_compose_metadata(self, container: Dict) -> Dict[str, Any]:
|
|
"""Extract Docker Compose related metadata from labels."""
|
|
labels = container.get('Config', {}).get('Labels', {})
|
|
compose_labels = {}
|
|
|
|
for key, value in labels.items():
|
|
if key.startswith('com.docker.compose.'):
|
|
clean_key = key.replace('com.docker.compose.', '')
|
|
compose_labels[clean_key] = value
|
|
|
|
return compose_labels
|
|
|
|
def generate_compose_template(self, container_config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Generate Docker Compose service definition from container config."""
|
|
service_name = container_config['name']
|
|
|
|
# Basic service definition
|
|
service = {
|
|
'image': container_config['image']['tag'],
|
|
'container_name': service_name
|
|
}
|
|
|
|
# Restart policy
|
|
restart_policy = container_config['runtime']['restart_policy'].get('Name', 'no')
|
|
if restart_policy != 'no':
|
|
service['restart'] = restart_policy
|
|
|
|
# Environment variables
|
|
if container_config['environment']:
|
|
service['environment'] = container_config['environment']
|
|
|
|
# Port mappings
|
|
if container_config['ports']['bindings']:
|
|
ports = []
|
|
for container_port, bindings in container_config['ports']['bindings'].items():
|
|
for binding in bindings:
|
|
host_port = binding['host_port']
|
|
host_ip = binding['host_ip']
|
|
if host_ip and host_ip != '0.0.0.0':
|
|
ports.append(f"{host_ip}:{host_port}:{container_port}")
|
|
else:
|
|
ports.append(f"{host_port}:{container_port}")
|
|
if ports:
|
|
service['ports'] = ports
|
|
|
|
# Volume mounts
|
|
if container_config['volumes']['bind_strings']:
|
|
service['volumes'] = container_config['volumes']['bind_strings']
|
|
|
|
# Networks
|
|
if len(container_config['networks']['networks']) > 0:
|
|
networks = list(container_config['networks']['networks'].keys())
|
|
# Remove default network names and compose-generated names
|
|
clean_networks = [net.split('_')[-1] if '_' in net else net for net in networks]
|
|
if clean_networks and clean_networks != ['default']:
|
|
service['networks'] = clean_networks
|
|
|
|
# Privileged mode
|
|
if container_config['runtime']['privileged']:
|
|
service['privileged'] = True
|
|
|
|
# Device mappings
|
|
if container_config['devices']['devices']:
|
|
devices = []
|
|
for device in container_config['devices']['devices']:
|
|
host_path = device['PathOnHost']
|
|
container_path = device['PathInContainer']
|
|
permissions = device.get('CgroupPermissions', 'rwm')
|
|
devices.append(f"{host_path}:{container_path}:{permissions}")
|
|
if devices:
|
|
service['devices'] = devices
|
|
|
|
# Security options
|
|
if container_config['security']['security_opt']:
|
|
service['security_opt'] = container_config['security']['security_opt']
|
|
|
|
# Capabilities
|
|
if container_config['security']['cap_add']:
|
|
service['cap_add'] = container_config['security']['cap_add']
|
|
if container_config['security']['cap_drop']:
|
|
service['cap_drop'] = container_config['security']['cap_drop']
|
|
|
|
# Working directory
|
|
if container_config['execution']['working_dir']:
|
|
service['working_dir'] = container_config['execution']['working_dir']
|
|
|
|
# User
|
|
if container_config['execution']['user']:
|
|
service['user'] = container_config['execution']['user']
|
|
|
|
# Command and entrypoint
|
|
if container_config['execution']['cmd']:
|
|
service['command'] = container_config['execution']['cmd']
|
|
if container_config['execution']['entrypoint']:
|
|
service['entrypoint'] = container_config['execution']['entrypoint']
|
|
|
|
# Stop signal
|
|
if container_config['execution']['stop_signal']:
|
|
service['stop_signal'] = container_config['execution']['stop_signal']
|
|
|
|
# Resource limits
|
|
resources = container_config['resources']
|
|
deploy_resources = {}
|
|
|
|
if resources['memory']['limit'] > 0:
|
|
deploy_resources.setdefault('limits', {})['memory'] = f"{resources['memory']['limit']}b"
|
|
if resources['memory']['reservation'] > 0:
|
|
deploy_resources.setdefault('reservations', {})['memory'] = f"{resources['memory']['reservation']}b"
|
|
if resources['cpu']['shares'] > 0:
|
|
deploy_resources.setdefault('limits', {})['cpus'] = str(resources['cpu']['shares'] / 1024)
|
|
|
|
if deploy_resources:
|
|
service['deploy'] = {'resources': deploy_resources}
|
|
|
|
return {service_name: service}
|
|
|
|
def audit_all_containers(self) -> None:
|
|
"""Perform comprehensive audit of all containers."""
|
|
print("🔍 Discovering container configurations...")
|
|
|
|
container_files = self.discover_container_files()
|
|
compose_files = self.discover_compose_files()
|
|
|
|
print(f"Found {len(container_files)} container files")
|
|
print(f"Found {len(compose_files)} compose files")
|
|
|
|
# Process each container
|
|
for container_file in container_files:
|
|
print(f"Processing: {container_file.name}")
|
|
config = self.extract_container_config(container_file)
|
|
|
|
if config:
|
|
container_name = config['name']
|
|
host = config['host_system']
|
|
|
|
self.audit_results['container_inventory'][f"{host}::{container_name}"] = config
|
|
|
|
# Generate compose template
|
|
compose_template = self.generate_compose_template(config)
|
|
self.audit_results['compose_templates'][f"{host}::{container_name}"] = compose_template
|
|
|
|
# Track privileged containers
|
|
if config['runtime']['privileged']:
|
|
self.audit_results['privileged_containers'].append(f"{host}::{container_name}")
|
|
|
|
# Track device mappings
|
|
if config['devices']['devices']:
|
|
self.audit_results['device_mappings'][f"{host}::{container_name}"] = config['devices']
|
|
|
|
# Track security configurations
|
|
if any([config['security']['security_opt'],
|
|
config['security']['cap_add'],
|
|
config['security']['cap_drop'],
|
|
config['security']['apparmor_profile'] != 'docker-default']):
|
|
self.audit_results['security_configurations'][f"{host}::{container_name}"] = config['security']
|
|
|
|
# Process compose files
|
|
for compose_file in compose_files:
|
|
try:
|
|
with open(compose_file, 'r') as f:
|
|
compose_data = yaml.safe_load(f)
|
|
host = self._extract_host_from_path(compose_file)
|
|
self.audit_results['compose_templates'][f"{host}::compose::{compose_file.name}"] = compose_data
|
|
except Exception as e:
|
|
print(f"Error reading compose file {compose_file}: {e}")
|
|
|
|
def generate_migration_checklist(self) -> Dict[str, List[str]]:
|
|
"""Generate comprehensive migration checklist."""
|
|
checklist = defaultdict(list)
|
|
|
|
for container_key, config in self.audit_results['container_inventory'].items():
|
|
host, container_name = container_key.split('::', 1)
|
|
|
|
# Data persistence checklist
|
|
if config['volumes']['detailed_mounts']:
|
|
checklist[f"{container_name} - Data Backup"].extend([
|
|
f"Backup volume: {mount['source']} -> {mount['destination']}"
|
|
for mount in config['volumes']['detailed_mounts']
|
|
if mount['source'] and not mount['source'].startswith('/var/lib/docker')
|
|
])
|
|
|
|
# Environment variables
|
|
if config['environment']:
|
|
sensitive_vars = [k for k in config['environment'].keys()
|
|
if 'SENSITIVE' in str(config['environment'][k])]
|
|
if sensitive_vars:
|
|
checklist[f"{container_name} - Secrets"].append(
|
|
f"Securely migrate sensitive variables: {', '.join(sensitive_vars)}"
|
|
)
|
|
|
|
# Network dependencies
|
|
if config['networks']['networks']:
|
|
checklist[f"{container_name} - Networks"].extend([
|
|
f"Create network: {net}" for net in config['networks']['networks'].keys()
|
|
])
|
|
|
|
# Device dependencies
|
|
if config['devices']['devices']:
|
|
checklist[f"{container_name} - Hardware"].extend([
|
|
f"Ensure device available: {device['PathOnHost']}"
|
|
for device in config['devices']['devices']
|
|
])
|
|
|
|
# Privileged access
|
|
if config['runtime']['privileged']:
|
|
checklist[f"{container_name} - Security"].append(
|
|
"Review privileged access requirements"
|
|
)
|
|
|
|
return dict(checklist)
|
|
|
|
def identify_configuration_gaps(self) -> List[Dict[str, Any]]:
|
|
"""Identify potential configuration gaps."""
|
|
gaps = []
|
|
|
|
for container_key, config in self.audit_results['container_inventory'].items():
|
|
host, container_name = container_key.split('::', 1)
|
|
|
|
# Check for missing image tags
|
|
if config['image']['tag'] == 'latest' or ':latest' in config['image']['tag']:
|
|
gaps.append({
|
|
'container': container_key,
|
|
'type': 'image_tag',
|
|
'severity': 'medium',
|
|
'description': 'Using :latest tag - should pin to specific version',
|
|
'recommendation': 'Replace with specific version tag'
|
|
})
|
|
|
|
# Check for containers with no restart policy
|
|
if config['runtime']['restart_policy'].get('Name') == 'no':
|
|
gaps.append({
|
|
'container': container_key,
|
|
'type': 'restart_policy',
|
|
'severity': 'low',
|
|
'description': 'No restart policy set',
|
|
'recommendation': 'Consider setting restart: unless-stopped'
|
|
})
|
|
|
|
# Check for potential security issues
|
|
if config['runtime']['privileged'] and not config['devices']['devices']:
|
|
gaps.append({
|
|
'container': container_key,
|
|
'type': 'security',
|
|
'severity': 'high',
|
|
'description': 'Privileged mode without specific device mappings',
|
|
'recommendation': 'Review if privileged access is necessary'
|
|
})
|
|
|
|
# Check for bind mounts to system directories
|
|
for mount in config['volumes']['detailed_mounts']:
|
|
if mount['source'] and mount['source'].startswith('/'):
|
|
system_paths = ['/etc', '/var', '/usr', '/bin', '/sbin', '/lib']
|
|
if any(mount['source'].startswith(path) for path in system_paths):
|
|
gaps.append({
|
|
'container': container_key,
|
|
'type': 'volume_security',
|
|
'severity': 'medium',
|
|
'description': f'Bind mount to system directory: {mount["source"]}',
|
|
'recommendation': 'Verify this mount is necessary and secure'
|
|
})
|
|
|
|
return gaps
|
|
|
|
def save_audit_results(self, output_dir: Path) -> None:
|
|
"""Save comprehensive audit results."""
|
|
output_dir.mkdir(exist_ok=True)
|
|
|
|
# Generate migration checklist
|
|
self.audit_results['migration_checklist'] = self.generate_migration_checklist()
|
|
|
|
# Identify configuration gaps
|
|
self.audit_results['configuration_gaps'] = self.identify_configuration_gaps()
|
|
|
|
# Save complete audit
|
|
with open(output_dir / 'COMPLETE_CONTAINER_AUDIT.yaml', 'w') as f:
|
|
yaml.dump(self.audit_results, f, default_flow_style=False, sort_keys=False)
|
|
|
|
# Save individual container configs
|
|
configs_dir = output_dir / 'individual_configs'
|
|
configs_dir.mkdir(exist_ok=True)
|
|
|
|
for container_key, config in self.audit_results['container_inventory'].items():
|
|
safe_name = container_key.replace('::', '_').replace('/', '_')
|
|
with open(configs_dir / f'{safe_name}_config.yaml', 'w') as f:
|
|
yaml.dump(config, f, default_flow_style=False)
|
|
|
|
# Save compose templates
|
|
compose_dir = output_dir / 'compose_templates'
|
|
compose_dir.mkdir(exist_ok=True)
|
|
|
|
for template_key, template in self.audit_results['compose_templates'].items():
|
|
if 'compose::' not in template_key: # Skip original compose files
|
|
safe_name = template_key.replace('::', '_').replace('/', '_')
|
|
with open(compose_dir / f'{safe_name}_compose.yml', 'w') as f:
|
|
yaml.dump({'services': template}, f, default_flow_style=False)
|
|
|
|
# Generate human-readable summary
|
|
self.generate_summary_report(output_dir)
|
|
|
|
def generate_summary_report(self, output_dir: Path) -> None:
|
|
"""Generate human-readable summary report."""
|
|
report = []
|
|
|
|
report.append("# COMPREHENSIVE CONTAINER CONFIGURATION AUDIT")
|
|
report.append("=" * 50)
|
|
report.append("")
|
|
|
|
# Overview
|
|
total_containers = len(self.audit_results['container_inventory'])
|
|
privileged_count = len(self.audit_results['privileged_containers'])
|
|
device_count = len(self.audit_results['device_mappings'])
|
|
security_count = len(self.audit_results['security_configurations'])
|
|
|
|
report.append(f"**Total Containers Analyzed:** {total_containers}")
|
|
report.append(f"**Privileged Containers:** {privileged_count}")
|
|
report.append(f"**Containers with Device Access:** {device_count}")
|
|
report.append(f"**Containers with Custom Security:** {security_count}")
|
|
report.append("")
|
|
|
|
# Privileged containers section
|
|
if self.audit_results['privileged_containers']:
|
|
report.append("## PRIVILEGED CONTAINERS")
|
|
report.append("These containers require special attention during migration:")
|
|
report.append("")
|
|
for container in self.audit_results['privileged_containers']:
|
|
config = self.audit_results['container_inventory'][container]
|
|
report.append(f"### {container}")
|
|
report.append(f"- **Image:** {config['image']['tag']}")
|
|
report.append(f"- **Host:** {config['host_system']}")
|
|
if config['devices']['devices']:
|
|
report.append("- **Device Access:**")
|
|
for device in config['devices']['devices']:
|
|
report.append(f" - {device['PathOnHost']} -> {device['PathInContainer']}")
|
|
report.append("")
|
|
|
|
# Configuration gaps
|
|
if self.audit_results['configuration_gaps']:
|
|
report.append("## CONFIGURATION GAPS & RECOMMENDATIONS")
|
|
report.append("")
|
|
|
|
gaps_by_severity = defaultdict(list)
|
|
for gap in self.audit_results['configuration_gaps']:
|
|
gaps_by_severity[gap['severity']].append(gap)
|
|
|
|
for severity in ['high', 'medium', 'low']:
|
|
if gaps_by_severity[severity]:
|
|
report.append(f"### {severity.upper()} Priority Issues")
|
|
for gap in gaps_by_severity[severity]:
|
|
report.append(f"- **{gap['container']}:** {gap['description']}")
|
|
report.append(f" - *Recommendation:* {gap['recommendation']}")
|
|
report.append("")
|
|
|
|
# Migration checklist summary
|
|
if self.audit_results['migration_checklist']:
|
|
report.append("## CRITICAL MIGRATION TASKS")
|
|
report.append("")
|
|
for task_category, tasks in self.audit_results['migration_checklist'].items():
|
|
report.append(f"### {task_category}")
|
|
for task in tasks:
|
|
report.append(f"- {task}")
|
|
report.append("")
|
|
|
|
# Network analysis
|
|
networks_found = set()
|
|
for config in self.audit_results['container_inventory'].values():
|
|
networks_found.update(config['networks']['networks'].keys())
|
|
|
|
if networks_found:
|
|
report.append("## REQUIRED NETWORKS")
|
|
report.append("These Docker networks must be created:")
|
|
report.append("")
|
|
for network in sorted(networks_found):
|
|
report.append(f"- {network}")
|
|
report.append("")
|
|
|
|
# Volume analysis
|
|
volumes_found = set()
|
|
for config in self.audit_results['container_inventory'].values():
|
|
for mount in config['volumes']['detailed_mounts']:
|
|
if mount['source'] and not mount['source'].startswith('/var/lib/docker'):
|
|
volumes_found.add(mount['source'])
|
|
|
|
if volumes_found:
|
|
report.append("## DATA DIRECTORIES TO BACKUP")
|
|
report.append("These host directories contain persistent data:")
|
|
report.append("")
|
|
for volume in sorted(volumes_found):
|
|
report.append(f"- {volume}")
|
|
report.append("")
|
|
|
|
# Save report
|
|
with open(output_dir / 'CONTAINER_AUDIT_SUMMARY.md', 'w') as f:
|
|
f.write('\n'.join(report))
|
|
|
|
def main():
|
|
if len(sys.argv) != 2:
|
|
print("Usage: python3 comprehensive_container_audit.py <discovery_root_directory>")
|
|
sys.exit(1)
|
|
|
|
discovery_root = sys.argv[1]
|
|
if not os.path.exists(discovery_root):
|
|
print(f"Error: Directory {discovery_root} does not exist")
|
|
sys.exit(1)
|
|
|
|
print("🚀 Starting Comprehensive Container Configuration Audit...")
|
|
print("=" * 60)
|
|
|
|
auditor = ContainerConfigurationAuditor(discovery_root)
|
|
auditor.audit_all_containers()
|
|
|
|
output_dir = Path(discovery_root) / 'container_audit_results'
|
|
auditor.save_audit_results(output_dir)
|
|
|
|
print("")
|
|
print("✅ Audit Complete!")
|
|
print(f"📊 Results saved to: {output_dir}")
|
|
print(f"📋 Summary report: {output_dir}/CONTAINER_AUDIT_SUMMARY.md")
|
|
print(f"🔧 Full audit data: {output_dir}/COMPLETE_CONTAINER_AUDIT.yaml")
|
|
print(f"📁 Individual configs: {output_dir}/individual_configs/")
|
|
print(f"🐳 Compose templates: {output_dir}/compose_templates/")
|
|
|
|
if __name__ == "__main__":
|
|
main() |