#!/usr/bin/env python3 """ Comprehensive Container Configuration Audit Tool This tool extracts ALL container configuration details necessary for identical recreation. It generates complete documentation, Docker Compose templates, and migration guides. """ import json import os import sys import yaml import re from pathlib import Path from typing import Dict, List, Any, Optional from collections import defaultdict class ContainerConfigurationAuditor: def __init__(self, discovery_root: str): self.discovery_root = Path(discovery_root) self.containers = {} self.compose_files = {} self.networks = {} self.volumes = {} self.audit_results = { 'container_inventory': {}, 'compose_templates': {}, 'configuration_gaps': [], 'migration_checklist': {}, 'security_configurations': {}, 'network_configurations': {}, 'volume_configurations': {}, 'device_mappings': {}, 'privileged_containers': [], 'custom_settings': {} } def discover_container_files(self) -> List[Path]: """Find all container JSON files in the discovery data.""" container_files = [] for path in self.discovery_root.rglob("container_*.json"): container_files.append(path) return container_files def discover_compose_files(self) -> List[Path]: """Find all Docker Compose files in the discovery data.""" compose_files = [] for path in self.discovery_root.rglob("compose_file_*.yml"): compose_files.append(path) return compose_files def extract_container_config(self, container_file: Path) -> Dict[str, Any]: """Extract comprehensive configuration from a container JSON file.""" try: with open(container_file, 'r') as f: container_data = json.load(f) if not isinstance(container_data, list) or len(container_data) == 0: return None container = container_data[0] # Docker inspect returns array config = { 'source_file': str(container_file), 'host_system': self._extract_host_from_path(container_file), 'container_id': container.get('Id', ''), 'name': container.get('Name', '').lstrip('/'), 'created': container.get('Created', ''), # Image Information 'image': { 'tag': container.get('Config', {}).get('Image', ''), 'sha': container.get('Image', ''), 'platform': container.get('Platform', 'linux') }, # Runtime Configuration 'runtime': { 'restart_policy': container.get('HostConfig', {}).get('RestartPolicy', {}), 'privileged': container.get('HostConfig', {}).get('Privileged', False), 'network_mode': container.get('HostConfig', {}).get('NetworkMode', ''), 'pid_mode': container.get('HostConfig', {}).get('PidMode', ''), 'ipc_mode': container.get('HostConfig', {}).get('IpcMode', ''), 'uts_mode': container.get('HostConfig', {}).get('UTSMode', ''), 'user_ns_mode': container.get('HostConfig', {}).get('UsernsMode', ''), 'cgroup_ns_mode': container.get('HostConfig', {}).get('CgroupnsMode', ''), 'auto_remove': container.get('HostConfig', {}).get('AutoRemove', False) }, # Environment Variables 'environment': self._extract_environment_vars(container), # Port Mappings 'ports': self._extract_port_mappings(container), # Volume Mounts 'volumes': self._extract_volume_mounts(container), # Network Settings 'networks': self._extract_network_settings(container), # Resource Limits 'resources': self._extract_resource_limits(container), # Security Settings 'security': self._extract_security_settings(container), # Device Mappings 'devices': self._extract_device_mappings(container), # Command and Entrypoint 'execution': { 'entrypoint': container.get('Config', {}).get('Entrypoint'), 'cmd': container.get('Config', {}).get('Cmd'), 'working_dir': container.get('Config', {}).get('WorkingDir'), 'user': container.get('Config', {}).get('User'), 'stop_signal': container.get('Config', {}).get('StopSignal') }, # Labels and Metadata 'labels': container.get('Config', {}).get('Labels', {}), 'compose_metadata': self._extract_compose_metadata(container) } return config except Exception as e: print(f"Error processing {container_file}: {e}") return None def _extract_host_from_path(self, path: Path) -> str: """Extract host system name from file path.""" parts = str(path).split('/') for part in parts: if part.startswith('system_audit_'): return part.replace('system_audit_', '').replace('_' + part.split('_')[-1], '') return 'unknown' def _extract_environment_vars(self, container: Dict) -> Dict[str, str]: """Extract environment variables with special handling for sensitive data.""" env_list = container.get('Config', {}).get('Env', []) env_dict = {} for env_var in env_list: if '=' in env_var: key, value = env_var.split('=', 1) # Mark sensitive variables if any(sensitive in key.upper() for sensitive in ['PASSWORD', 'SECRET', 'KEY', 'TOKEN', 'PASS']): env_dict[key] = f"***SENSITIVE_VALUE*** ({value[:4]}...)" if len(value) > 4 else "***SENSITIVE***" else: env_dict[key] = value else: env_dict[env_var] = "" return env_dict def _extract_port_mappings(self, container: Dict) -> Dict[str, Any]: """Extract port mappings and exposed ports.""" port_bindings = container.get('HostConfig', {}).get('PortBindings', {}) exposed_ports = container.get('Config', {}).get('ExposedPorts', {}) network_ports = container.get('NetworkSettings', {}).get('Ports', {}) ports = { 'exposed': list(exposed_ports.keys()) if exposed_ports else [], 'bindings': {}, 'published': {} } # Process port bindings for container_port, bindings in port_bindings.items(): if bindings: ports['bindings'][container_port] = [ { 'host_ip': binding.get('HostIp', '0.0.0.0'), 'host_port': binding.get('HostPort') } for binding in bindings ] # Process published ports from network settings for container_port, bindings in network_ports.items(): if bindings: ports['published'][container_port] = [ { 'host_ip': binding.get('HostIp', '0.0.0.0'), 'host_port': binding.get('HostPort') } for binding in bindings ] return ports def _extract_volume_mounts(self, container: Dict) -> List[Dict[str, Any]]: """Extract volume mounts with full details.""" mounts = container.get('Mounts', []) binds = container.get('HostConfig', {}).get('Binds', []) volumes = [] # Process mounts from Mounts section (most detailed) for mount in mounts: volume = { 'type': mount.get('Type'), 'source': mount.get('Source'), 'destination': mount.get('Destination'), 'mode': mount.get('Mode'), 'rw': mount.get('RW'), 'propagation': mount.get('Propagation'), 'driver': mount.get('Driver'), 'name': mount.get('Name') } volumes.append(volume) # Also capture bind mount strings for verification bind_strings = binds if binds else [] return { 'detailed_mounts': volumes, 'bind_strings': bind_strings } def _extract_network_settings(self, container: Dict) -> Dict[str, Any]: """Extract comprehensive network configuration.""" networks = container.get('NetworkSettings', {}).get('Networks', {}) host_config = container.get('HostConfig', {}) network_config = { 'networks': {}, 'dns': { 'nameservers': host_config.get('Dns', []), 'search_domains': host_config.get('DnsSearch', []), 'options': host_config.get('DnsOptions', []) }, 'extra_hosts': host_config.get('ExtraHosts', []), 'links': host_config.get('Links', []), 'publish_all_ports': host_config.get('PublishAllPorts', False) } # Process each network attachment for network_name, network_info in networks.items(): network_config['networks'][network_name] = { 'ip_address': network_info.get('IPAddress'), 'ip_prefix_len': network_info.get('IPPrefixLen'), 'gateway': network_info.get('Gateway'), 'mac_address': network_info.get('MacAddress'), 'network_id': network_info.get('NetworkID'), 'endpoint_id': network_info.get('EndpointID'), 'aliases': network_info.get('Aliases', []), 'dns_names': network_info.get('DNSNames', []), 'ipv6_gateway': network_info.get('IPv6Gateway'), 'global_ipv6_address': network_info.get('GlobalIPv6Address'), 'ipam_config': network_info.get('IPAMConfig', {}) } return network_config def _extract_resource_limits(self, container: Dict) -> Dict[str, Any]: """Extract resource limits and constraints.""" host_config = container.get('HostConfig', {}) return { 'cpu': { 'shares': host_config.get('CpuShares', 0), 'period': host_config.get('CpuPeriod', 0), 'quota': host_config.get('CpuQuota', 0), 'realtime_period': host_config.get('CpuRealtimePeriod', 0), 'realtime_runtime': host_config.get('CpuRealtimeRuntime', 0), 'cpuset_cpus': host_config.get('CpusetCpus', ''), 'cpuset_mems': host_config.get('CpusetMems', ''), 'count': host_config.get('CpuCount', 0), 'percent': host_config.get('CpuPercent', 0) }, 'memory': { 'limit': host_config.get('Memory', 0), 'reservation': host_config.get('MemoryReservation', 0), 'swap': host_config.get('MemorySwap', 0), 'swappiness': host_config.get('MemorySwappiness'), 'oom_kill_disable': host_config.get('OomKillDisable') }, 'blkio': { 'weight': host_config.get('BlkioWeight', 0), 'weight_device': host_config.get('BlkioWeightDevice'), 'device_read_bps': host_config.get('BlkioDeviceReadBps'), 'device_write_bps': host_config.get('BlkioDeviceWriteBps'), 'device_read_iops': host_config.get('BlkioDeviceReadIOps'), 'device_write_iops': host_config.get('BlkioDeviceWriteIOps') }, 'io': { 'maximum_iops': host_config.get('IOMaximumIOps', 0), 'maximum_bandwidth': host_config.get('IOMaximumBandwidth', 0) }, 'pids_limit': host_config.get('PidsLimit'), 'ulimits': host_config.get('Ulimits'), 'shm_size': host_config.get('ShmSize', 67108864) } def _extract_security_settings(self, container: Dict) -> Dict[str, Any]: """Extract security-related settings.""" host_config = container.get('HostConfig', {}) return { 'apparmor_profile': container.get('AppArmorProfile'), 'security_opt': host_config.get('SecurityOpt', []), 'cap_add': host_config.get('CapAdd', []), 'cap_drop': host_config.get('CapDrop', []), 'group_add': host_config.get('GroupAdd', []), 'readonly_rootfs': host_config.get('ReadonlyRootfs', False), 'masked_paths': host_config.get('MaskedPaths', []), 'readonly_paths': host_config.get('ReadonlyPaths', []), 'no_new_privileges': host_config.get('NoNewPrivileges', False), 'oom_score_adj': host_config.get('OomScoreAdj', 0), 'runtime': host_config.get('Runtime', 'runc'), 'isolation': host_config.get('Isolation', ''), 'cgroup': host_config.get('Cgroup', ''), 'cgroup_parent': host_config.get('CgroupParent', '') } def _extract_device_mappings(self, container: Dict) -> List[Dict[str, Any]]: """Extract device mappings and hardware access.""" devices = container.get('HostConfig', {}).get('Devices', []) device_requests = container.get('HostConfig', {}).get('DeviceRequests', []) device_cgroup_rules = container.get('HostConfig', {}).get('DeviceCgroupRules', []) return { 'devices': devices or [], 'device_requests': device_requests or [], 'device_cgroup_rules': device_cgroup_rules or [] } def _extract_compose_metadata(self, container: Dict) -> Dict[str, Any]: """Extract Docker Compose related metadata from labels.""" labels = container.get('Config', {}).get('Labels', {}) compose_labels = {} for key, value in labels.items(): if key.startswith('com.docker.compose.'): clean_key = key.replace('com.docker.compose.', '') compose_labels[clean_key] = value return compose_labels def generate_compose_template(self, container_config: Dict[str, Any]) -> Dict[str, Any]: """Generate Docker Compose service definition from container config.""" service_name = container_config['name'] # Basic service definition service = { 'image': container_config['image']['tag'], 'container_name': service_name } # Restart policy restart_policy = container_config['runtime']['restart_policy'].get('Name', 'no') if restart_policy != 'no': service['restart'] = restart_policy # Environment variables if container_config['environment']: service['environment'] = container_config['environment'] # Port mappings if container_config['ports']['bindings']: ports = [] for container_port, bindings in container_config['ports']['bindings'].items(): for binding in bindings: host_port = binding['host_port'] host_ip = binding['host_ip'] if host_ip and host_ip != '0.0.0.0': ports.append(f"{host_ip}:{host_port}:{container_port}") else: ports.append(f"{host_port}:{container_port}") if ports: service['ports'] = ports # Volume mounts if container_config['volumes']['bind_strings']: service['volumes'] = container_config['volumes']['bind_strings'] # Networks if len(container_config['networks']['networks']) > 0: networks = list(container_config['networks']['networks'].keys()) # Remove default network names and compose-generated names clean_networks = [net.split('_')[-1] if '_' in net else net for net in networks] if clean_networks and clean_networks != ['default']: service['networks'] = clean_networks # Privileged mode if container_config['runtime']['privileged']: service['privileged'] = True # Device mappings if container_config['devices']['devices']: devices = [] for device in container_config['devices']['devices']: host_path = device['PathOnHost'] container_path = device['PathInContainer'] permissions = device.get('CgroupPermissions', 'rwm') devices.append(f"{host_path}:{container_path}:{permissions}") if devices: service['devices'] = devices # Security options if container_config['security']['security_opt']: service['security_opt'] = container_config['security']['security_opt'] # Capabilities if container_config['security']['cap_add']: service['cap_add'] = container_config['security']['cap_add'] if container_config['security']['cap_drop']: service['cap_drop'] = container_config['security']['cap_drop'] # Working directory if container_config['execution']['working_dir']: service['working_dir'] = container_config['execution']['working_dir'] # User if container_config['execution']['user']: service['user'] = container_config['execution']['user'] # Command and entrypoint if container_config['execution']['cmd']: service['command'] = container_config['execution']['cmd'] if container_config['execution']['entrypoint']: service['entrypoint'] = container_config['execution']['entrypoint'] # Stop signal if container_config['execution']['stop_signal']: service['stop_signal'] = container_config['execution']['stop_signal'] # Resource limits resources = container_config['resources'] deploy_resources = {} if resources['memory']['limit'] > 0: deploy_resources.setdefault('limits', {})['memory'] = f"{resources['memory']['limit']}b" if resources['memory']['reservation'] > 0: deploy_resources.setdefault('reservations', {})['memory'] = f"{resources['memory']['reservation']}b" if resources['cpu']['shares'] > 0: deploy_resources.setdefault('limits', {})['cpus'] = str(resources['cpu']['shares'] / 1024) if deploy_resources: service['deploy'] = {'resources': deploy_resources} return {service_name: service} def audit_all_containers(self) -> None: """Perform comprehensive audit of all containers.""" print("🔍 Discovering container configurations...") container_files = self.discover_container_files() compose_files = self.discover_compose_files() print(f"Found {len(container_files)} container files") print(f"Found {len(compose_files)} compose files") # Process each container for container_file in container_files: print(f"Processing: {container_file.name}") config = self.extract_container_config(container_file) if config: container_name = config['name'] host = config['host_system'] self.audit_results['container_inventory'][f"{host}::{container_name}"] = config # Generate compose template compose_template = self.generate_compose_template(config) self.audit_results['compose_templates'][f"{host}::{container_name}"] = compose_template # Track privileged containers if config['runtime']['privileged']: self.audit_results['privileged_containers'].append(f"{host}::{container_name}") # Track device mappings if config['devices']['devices']: self.audit_results['device_mappings'][f"{host}::{container_name}"] = config['devices'] # Track security configurations if any([config['security']['security_opt'], config['security']['cap_add'], config['security']['cap_drop'], config['security']['apparmor_profile'] != 'docker-default']): self.audit_results['security_configurations'][f"{host}::{container_name}"] = config['security'] # Process compose files for compose_file in compose_files: try: with open(compose_file, 'r') as f: compose_data = yaml.safe_load(f) host = self._extract_host_from_path(compose_file) self.audit_results['compose_templates'][f"{host}::compose::{compose_file.name}"] = compose_data except Exception as e: print(f"Error reading compose file {compose_file}: {e}") def generate_migration_checklist(self) -> Dict[str, List[str]]: """Generate comprehensive migration checklist.""" checklist = defaultdict(list) for container_key, config in self.audit_results['container_inventory'].items(): host, container_name = container_key.split('::', 1) # Data persistence checklist if config['volumes']['detailed_mounts']: checklist[f"{container_name} - Data Backup"].extend([ f"Backup volume: {mount['source']} -> {mount['destination']}" for mount in config['volumes']['detailed_mounts'] if mount['source'] and not mount['source'].startswith('/var/lib/docker') ]) # Environment variables if config['environment']: sensitive_vars = [k for k in config['environment'].keys() if 'SENSITIVE' in str(config['environment'][k])] if sensitive_vars: checklist[f"{container_name} - Secrets"].append( f"Securely migrate sensitive variables: {', '.join(sensitive_vars)}" ) # Network dependencies if config['networks']['networks']: checklist[f"{container_name} - Networks"].extend([ f"Create network: {net}" for net in config['networks']['networks'].keys() ]) # Device dependencies if config['devices']['devices']: checklist[f"{container_name} - Hardware"].extend([ f"Ensure device available: {device['PathOnHost']}" for device in config['devices']['devices'] ]) # Privileged access if config['runtime']['privileged']: checklist[f"{container_name} - Security"].append( "Review privileged access requirements" ) return dict(checklist) def identify_configuration_gaps(self) -> List[Dict[str, Any]]: """Identify potential configuration gaps.""" gaps = [] for container_key, config in self.audit_results['container_inventory'].items(): host, container_name = container_key.split('::', 1) # Check for missing image tags if config['image']['tag'] == 'latest' or ':latest' in config['image']['tag']: gaps.append({ 'container': container_key, 'type': 'image_tag', 'severity': 'medium', 'description': 'Using :latest tag - should pin to specific version', 'recommendation': 'Replace with specific version tag' }) # Check for containers with no restart policy if config['runtime']['restart_policy'].get('Name') == 'no': gaps.append({ 'container': container_key, 'type': 'restart_policy', 'severity': 'low', 'description': 'No restart policy set', 'recommendation': 'Consider setting restart: unless-stopped' }) # Check for potential security issues if config['runtime']['privileged'] and not config['devices']['devices']: gaps.append({ 'container': container_key, 'type': 'security', 'severity': 'high', 'description': 'Privileged mode without specific device mappings', 'recommendation': 'Review if privileged access is necessary' }) # Check for bind mounts to system directories for mount in config['volumes']['detailed_mounts']: if mount['source'] and mount['source'].startswith('/'): system_paths = ['/etc', '/var', '/usr', '/bin', '/sbin', '/lib'] if any(mount['source'].startswith(path) for path in system_paths): gaps.append({ 'container': container_key, 'type': 'volume_security', 'severity': 'medium', 'description': f'Bind mount to system directory: {mount["source"]}', 'recommendation': 'Verify this mount is necessary and secure' }) return gaps def save_audit_results(self, output_dir: Path) -> None: """Save comprehensive audit results.""" output_dir.mkdir(exist_ok=True) # Generate migration checklist self.audit_results['migration_checklist'] = self.generate_migration_checklist() # Identify configuration gaps self.audit_results['configuration_gaps'] = self.identify_configuration_gaps() # Save complete audit with open(output_dir / 'COMPLETE_CONTAINER_AUDIT.yaml', 'w') as f: yaml.dump(self.audit_results, f, default_flow_style=False, sort_keys=False) # Save individual container configs configs_dir = output_dir / 'individual_configs' configs_dir.mkdir(exist_ok=True) for container_key, config in self.audit_results['container_inventory'].items(): safe_name = container_key.replace('::', '_').replace('/', '_') with open(configs_dir / f'{safe_name}_config.yaml', 'w') as f: yaml.dump(config, f, default_flow_style=False) # Save compose templates compose_dir = output_dir / 'compose_templates' compose_dir.mkdir(exist_ok=True) for template_key, template in self.audit_results['compose_templates'].items(): if 'compose::' not in template_key: # Skip original compose files safe_name = template_key.replace('::', '_').replace('/', '_') with open(compose_dir / f'{safe_name}_compose.yml', 'w') as f: yaml.dump({'services': template}, f, default_flow_style=False) # Generate human-readable summary self.generate_summary_report(output_dir) def generate_summary_report(self, output_dir: Path) -> None: """Generate human-readable summary report.""" report = [] report.append("# COMPREHENSIVE CONTAINER CONFIGURATION AUDIT") report.append("=" * 50) report.append("") # Overview total_containers = len(self.audit_results['container_inventory']) privileged_count = len(self.audit_results['privileged_containers']) device_count = len(self.audit_results['device_mappings']) security_count = len(self.audit_results['security_configurations']) report.append(f"**Total Containers Analyzed:** {total_containers}") report.append(f"**Privileged Containers:** {privileged_count}") report.append(f"**Containers with Device Access:** {device_count}") report.append(f"**Containers with Custom Security:** {security_count}") report.append("") # Privileged containers section if self.audit_results['privileged_containers']: report.append("## PRIVILEGED CONTAINERS") report.append("These containers require special attention during migration:") report.append("") for container in self.audit_results['privileged_containers']: config = self.audit_results['container_inventory'][container] report.append(f"### {container}") report.append(f"- **Image:** {config['image']['tag']}") report.append(f"- **Host:** {config['host_system']}") if config['devices']['devices']: report.append("- **Device Access:**") for device in config['devices']['devices']: report.append(f" - {device['PathOnHost']} -> {device['PathInContainer']}") report.append("") # Configuration gaps if self.audit_results['configuration_gaps']: report.append("## CONFIGURATION GAPS & RECOMMENDATIONS") report.append("") gaps_by_severity = defaultdict(list) for gap in self.audit_results['configuration_gaps']: gaps_by_severity[gap['severity']].append(gap) for severity in ['high', 'medium', 'low']: if gaps_by_severity[severity]: report.append(f"### {severity.upper()} Priority Issues") for gap in gaps_by_severity[severity]: report.append(f"- **{gap['container']}:** {gap['description']}") report.append(f" - *Recommendation:* {gap['recommendation']}") report.append("") # Migration checklist summary if self.audit_results['migration_checklist']: report.append("## CRITICAL MIGRATION TASKS") report.append("") for task_category, tasks in self.audit_results['migration_checklist'].items(): report.append(f"### {task_category}") for task in tasks: report.append(f"- {task}") report.append("") # Network analysis networks_found = set() for config in self.audit_results['container_inventory'].values(): networks_found.update(config['networks']['networks'].keys()) if networks_found: report.append("## REQUIRED NETWORKS") report.append("These Docker networks must be created:") report.append("") for network in sorted(networks_found): report.append(f"- {network}") report.append("") # Volume analysis volumes_found = set() for config in self.audit_results['container_inventory'].values(): for mount in config['volumes']['detailed_mounts']: if mount['source'] and not mount['source'].startswith('/var/lib/docker'): volumes_found.add(mount['source']) if volumes_found: report.append("## DATA DIRECTORIES TO BACKUP") report.append("These host directories contain persistent data:") report.append("") for volume in sorted(volumes_found): report.append(f"- {volume}") report.append("") # Save report with open(output_dir / 'CONTAINER_AUDIT_SUMMARY.md', 'w') as f: f.write('\n'.join(report)) def main(): if len(sys.argv) != 2: print("Usage: python3 comprehensive_container_audit.py ") sys.exit(1) discovery_root = sys.argv[1] if not os.path.exists(discovery_root): print(f"Error: Directory {discovery_root} does not exist") sys.exit(1) print("🚀 Starting Comprehensive Container Configuration Audit...") print("=" * 60) auditor = ContainerConfigurationAuditor(discovery_root) auditor.audit_all_containers() output_dir = Path(discovery_root) / 'container_audit_results' auditor.save_audit_results(output_dir) print("") print("✅ Audit Complete!") print(f"📊 Results saved to: {output_dir}") print(f"📋 Summary report: {output_dir}/CONTAINER_AUDIT_SUMMARY.md") print(f"🔧 Full audit data: {output_dir}/COMPLETE_CONTAINER_AUDIT.yaml") print(f"📁 Individual configs: {output_dir}/individual_configs/") print(f"🐳 Compose templates: {output_dir}/compose_templates/") if __name__ == "__main__": main()