#!/usr/bin/env python3 """ Domain Analyzer - Umfassende Domain und Mailserver Analyse Analysiert DNS, WHOIS, Ports, SSL-Zertifikate, IP-Informationen und mehr """ import dns.resolver import dns.reversename import socket import ssl import json import argparse import sys from datetime import datetime from typing import Dict, List, Any import concurrent.futures import subprocess import re # Farben für Terminal-Output class Colors: HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' END = '\033[0m' BOLD = '\033[1m' def print_header(text: str): print(f"\n{Colors.BOLD}{Colors.HEADER}{'='*80}{Colors.END}") print(f"{Colors.BOLD}{Colors.HEADER}{text.center(80)}{Colors.END}") print(f"{Colors.BOLD}{Colors.HEADER}{'='*80}{Colors.END}\n") def print_section(text: str): print(f"\n{Colors.BOLD}{Colors.CYAN}[*] {text}{Colors.END}") def print_info(key: str, value: str): print(f"{Colors.GREEN} ├─ {key}:{Colors.END} {value}") def print_warning(text: str): print(f"{Colors.YELLOW} ⚠ {text}{Colors.END}") def print_error(text: str): print(f"{Colors.RED} ✗ {text}{Colors.END}") def print_success(text: str): print(f"{Colors.GREEN} ✓ {text}{Colors.END}") class DomainAnalyzer: def __init__(self, domain: str): self.domain = domain.lower().strip() self.results = { 'domain': self.domain, 'timestamp': datetime.now().isoformat(), 'dns': {}, 'whois': {}, 'ips': [], 'ports': {}, 'ssl_certificates': [], 'email_security': {}, 'http_info': {}, 'subdomains': [] } def analyze_all(self): """Führt alle Analysen durch""" print_header(f"DOMAIN ANALYSE: {self.domain}") self.analyze_dns() self.analyze_whois() self.analyze_ips() self.check_email_security() self.scan_common_ports() self.analyze_ssl_certificates() self.analyze_http() self.find_subdomains() self.analyze_ownership() # Neue Funktion für Inhaberschaft return self.results def analyze_ownership(self): """Zusätzliche Analyse zur Inhaberschaft""" print_section("Zusätzliche Inhaber-Informationen") # ASN Organisation als Hinweis if self.results['ips']: print_info("Hosting-Provider", "Basierend auf IP-Informationen:") for ip_info in self.results['ips'][:2]: if ip_info.get('organization'): print(f" {ip_info['ip']}: {ip_info['organization']}") if ip_info.get('country'): print(f" Land: {ip_info['country']}") # Nameserver-Organisation if 'NS' in self.results['dns'] and self.results['dns']['NS']: print() print_info("DNS-Provider", "Basierend auf Nameservern:") for ns in self.results['dns']['NS'][:2]: ns_clean = ns.rstrip('.') # Versuche WHOIS für Nameserver try: result = subprocess.run( ['whois', ns_clean], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: org_match = re.search(r'OrgName:\s*(.+)', result.stdout) if org_match: print(f" {ns_clean}: {org_match.group(1).strip()}") else: print(f" {ns_clean}") except: print(f" {ns_clean}") # Zertifikat-Informationen if self.results['ssl_certificates']: print() print_info("SSL-Zertifikat Inhaber", "") for cert in self.results['ssl_certificates'][:1]: if cert.get('subject'): subject = cert['subject'] if 'organizationName' in subject: print(f" Organisation: {subject['organizationName']}") if 'commonName' in subject: print(f" Common Name: {subject['commonName']}") # Zusammenfassung print() print_header("📋 ZUSAMMENFASSUNG VERANTWORTLICHKEIT") owner = "Unbekannt" if 'registrant_organization' in self.results.get('whois', {}): owner = self.results['whois']['registrant_organization'][0] elif 'registrant_name' in self.results.get('whois', {}): owner = self.results['whois']['registrant_name'][0] print_success(f"Domain-Inhaber: {owner}") if 'registrar' in self.results.get('whois', {}): print_info("Registrar", self.results['whois']['registrar'][0]) if self.results['ips'] and self.results['ips'][0].get('organization'): print_info("Hosting", self.results['ips'][0]['organization']) def analyze_dns(self): """DNS-Records analysieren""" print_section("DNS Records") record_types = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'SOA', 'CAA', 'CNAME'] for record_type in record_types: try: answers = dns.resolver.resolve(self.domain, record_type) records = [] for rdata in answers: if record_type == 'MX': records.append(f"{rdata.preference} {rdata.exchange}") elif record_type == 'SOA': records.append(f"{rdata.mname} {rdata.rname}") else: records.append(str(rdata)) self.results['dns'][record_type] = records print_info(record_type, f"{len(records)} Record(s)") for record in records: print(f" {record}") except dns.resolver.NoAnswer: self.results['dns'][record_type] = [] except dns.resolver.NXDOMAIN: print_error(f"Domain {self.domain} existiert nicht!") sys.exit(1) except Exception as e: print_warning(f"{record_type}: Fehler - {str(e)}") def get_base_domain(self, domain: str) -> str: """Extrahiert die Basis-Domain (z.B. example.com aus sub.example.com)""" parts = domain.split('.') if len(parts) >= 2: # Nimm die letzten 2 Teile (domain.tld) # Für .co.uk etc. müsste man eine TLD-Liste verwenden, aber für die meisten Fälle reicht das return '.'.join(parts[-2:]) return domain def analyze_whois(self): """WHOIS-Informationen abrufen""" print_section("WHOIS Information") domain_to_check = self.domain is_subdomain = False # Prüfe ob es eine Subdomain ist (mehr als 2 Teile) parts = self.domain.split('.') if len(parts) > 2: is_subdomain = True base_domain = self.get_base_domain(self.domain) print_info("Subdomain erkannt", f"Prüfe Hauptdomain: {base_domain}") domain_to_check = base_domain try: result = subprocess.run( ['whois', domain_to_check], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: whois_data = result.stdout # Prüfe ob "No match" in der Antwort ist if 'No match' in whois_data or 'NOT FOUND' in whois_data: print_error(f"Keine WHOIS-Daten für {domain_to_check} gefunden") return # Erweiterte Muster für Inhaber-Informationen patterns = { 'registrar': [ r'Registrar:\s*(.+)', r'Registrar Name:\s*(.+)' ], 'registrant_organization': [ r'Registrant Organization:\s*(.+)', r'Organization:\s*(.+)', r'org:\s*(.+)' ], 'registrant_name': [ r'Registrant Name:\s*(.+)', r'Registrant:\s*(.+)' ], 'registrant_country': [ r'Registrant Country:\s*([A-Z]{2})', r'Country:\s*([A-Z]{2})' ], 'registrant_email': [ r'Registrant Email:\s*(.+)', r'Email:\s*(.+)' ], 'creation_date': [ r'Creation Date:\s*(.+)', r'Created:\s*(.+)', r'created:\s*(.+)' ], 'expiration_date': [ r'Registry Expiry Date:\s*(.+)', r'Registrar Registration Expiration Date:\s*(.+)', r'Expiry Date:\s*(.+)', r'paid-till:\s*(.+)' ], 'status': [ r'Domain Status:\s*(.+)', r'Status:\s*(.+)' ], 'name_servers': [ r'Name Server:\s*(.+)', r'nserver:\s*(.+)' ], } self.results['whois']['domain_checked'] = domain_to_check if is_subdomain: self.results['whois']['note'] = f"Subdomain - WHOIS von Hauptdomain {domain_to_check}" # Extrahiere Informationen mit mehreren Pattern-Varianten for key, pattern_list in patterns.items(): if not isinstance(pattern_list, list): pattern_list = [pattern_list] for pattern in pattern_list: matches = re.findall(pattern, whois_data, re.IGNORECASE | re.MULTILINE) if matches: # Bereinige Matches cleaned_matches = [m.strip() for m in matches if m.strip() and m.strip() != 'REDACTED FOR PRIVACY'] if cleaned_matches: self.results['whois'][key] = cleaned_matches if key == 'name_servers': print_info(key.replace('_', ' ').title(), f"{len(cleaned_matches)} Server") for ns in cleaned_matches[:5]: print(f" {ns}") elif key == 'status': print_info(key.replace('_', ' ').title(), f"{len(cleaned_matches)} Status(se)") for status in cleaned_matches[:3]: print(f" {status}") else: display_name = key.replace('_', ' ').title() print_info(display_name, cleaned_matches[0]) break # Erstes passendes Pattern gefunden # WICHTIG: Inhaber hervorheben print() print_header("🔍 DOMAIN-INHABER") if 'registrant_organization' in self.results['whois']: print_success(f"Organisation: {self.results['whois']['registrant_organization'][0]}") elif 'registrant_name' in self.results['whois']: print_success(f"Name: {self.results['whois']['registrant_name'][0]}") else: print_warning("Inhaber nicht identifizierbar (möglicherweise durch Privacy Protection verborgen)") if 'registrant_country' in self.results['whois']: print_info("Land", self.results['whois']['registrant_country'][0]) if 'registrant_email' in self.results['whois']: email = self.results['whois']['registrant_email'][0] if 'contact-form' not in email and 'privacy' not in email.lower(): print_info("Email", email) if 'registrar' in self.results['whois']: print_info("Registrar", self.results['whois']['registrar'][0]) self.results['whois']['raw'] = whois_data else: print_error("WHOIS-Abfrage fehlgeschlagen") except FileNotFoundError: print_error("whois-Tool nicht installiert") print_info("Installation", "sudo pacman -S whois") except subprocess.TimeoutExpired: print_error("WHOIS-Abfrage Timeout") except Exception as e: print_error(f"WHOIS-Fehler: {str(e)}") def analyze_ips(self): """IP-Adressen analysieren""" print_section("IP-Adress-Analyse") ips = [] # A Records if 'A' in self.results['dns']: ips.extend(self.results['dns']['A']) # AAAA Records if 'AAAA' in self.results['dns']: ips.extend(self.results['dns']['AAAA']) for ip in ips: ip_info = self.get_ip_info(ip) self.results['ips'].append(ip_info) print_info("IP", ip) if ip_info.get('reverse_dns'): print(f" Reverse DNS: {ip_info['reverse_dns']}") if ip_info.get('asn'): print(f" ASN: {ip_info['asn']}") if ip_info.get('country'): print(f" Land: {ip_info['country']}") def get_ip_info(self, ip: str) -> Dict[str, Any]: """Detaillierte IP-Informationen""" info = {'ip': ip} # Reverse DNS try: rev_name = dns.reversename.from_address(ip) rev_dns = str(dns.resolver.resolve(rev_name, "PTR")[0]) info['reverse_dns'] = rev_dns except: info['reverse_dns'] = None # ASN und Geolocation via whois try: result = subprocess.run( ['whois', ip], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: whois_output = result.stdout # ASN extrahieren asn_match = re.search(r'AS(\d+)', whois_output) if asn_match: info['asn'] = f"AS{asn_match.group(1)}" # Organisation org_match = re.search(r'OrgName:\s*(.+)', whois_output) if org_match: info['organization'] = org_match.group(1).strip() # Land country_match = re.search(r'Country:\s*([A-Z]{2})', whois_output) if country_match: info['country'] = country_match.group(1) except: pass return info def check_email_security(self): """Email-Sicherheit prüfen (SPF, DMARC, DKIM)""" print_section("Email-Sicherheit") # SPF prüfen spf_record = None if 'TXT' in self.results['dns']: for txt in self.results['dns']['TXT']: if 'v=spf1' in txt.lower(): spf_record = txt break if spf_record: print_success(f"SPF: Vorhanden") print(f" {spf_record}") self.results['email_security']['spf'] = spf_record else: print_warning("SPF: Nicht gefunden!") self.results['email_security']['spf'] = None # DMARC prüfen try: dmarc_answers = dns.resolver.resolve(f'_dmarc.{self.domain}', 'TXT') dmarc_record = str(dmarc_answers[0]) print_success(f"DMARC: Vorhanden") print(f" {dmarc_record}") self.results['email_security']['dmarc'] = dmarc_record except: print_warning("DMARC: Nicht gefunden!") self.results['email_security']['dmarc'] = None # DKIM Selektoren testen (häufige) print_info("DKIM", "Teste häufige Selektoren...") dkim_selectors = ['default', 'google', 'k1', 'k2', 'selector1', 'selector2', 'mail', 'dkim'] dkim_found = [] for selector in dkim_selectors: try: dkim_query = f'{selector}._domainkey.{self.domain}' dkim_answers = dns.resolver.resolve(dkim_query, 'TXT') dkim_record = str(dkim_answers[0]) dkim_found.append({'selector': selector, 'record': dkim_record}) print(f" ✓ {selector}: Gefunden") except: pass if dkim_found: self.results['email_security']['dkim'] = dkim_found else: print(f" Keine DKIM-Records mit Standard-Selektoren gefunden") self.results['email_security']['dkim'] = [] def scan_common_ports(self): """Häufige Ports scannen""" print_section("Port-Scan (häufige Ports)") # IP-Adressen sammeln ips_to_scan = [] if 'A' in self.results['dns']: ips_to_scan.extend(self.results['dns']['A']) if not ips_to_scan: print_warning("Keine IPv4-Adressen zum Scannen") return # Häufige Ports common_ports = { 21: 'FTP', 22: 'SSH', 25: 'SMTP', 53: 'DNS', 80: 'HTTP', 110: 'POP3', 143: 'IMAP', 443: 'HTTPS', 465: 'SMTPS', 587: 'SMTP Submission', 993: 'IMAPS', 995: 'POP3S', 3306: 'MySQL', 3389: 'RDP', 5432: 'PostgreSQL', 8080: 'HTTP-Alt', 8443: 'HTTPS-Alt' } for ip in ips_to_scan[:1]: # Nur erste IP scannen print_info("Scanne IP", ip) open_ports = [] with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: future_to_port = { executor.submit(self.check_port, ip, port): port for port in common_ports.keys() } for future in concurrent.futures.as_completed(future_to_port): port = future_to_port[future] try: is_open = future.result() if is_open: service = common_ports[port] open_ports.append({'port': port, 'service': service}) print_success(f"Port {port} ({service}): OFFEN") except: pass self.results['ports'][ip] = open_ports def check_port(self, ip: str, port: int, timeout: float = 2.0) -> bool: """Prüft ob ein Port offen ist""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((ip, port)) sock.close() return result == 0 except: return False def analyze_ssl_certificates(self): """SSL/TLS-Zertifikate analysieren""" print_section("SSL/TLS-Zertifikate") # Ports mit SSL/TLS ssl_ports = [443, 465, 587, 993, 995, 8443] ips_to_check = [] if 'A' in self.results['dns']: ips_to_check.extend(self.results['dns']['A']) for ip in ips_to_check[:1]: # Nur erste IP for port in ssl_ports: if self.check_port(ip, port, timeout=1.0): cert_info = self.get_ssl_certificate(self.domain, port) if cert_info: self.results['ssl_certificates'].append(cert_info) print_info(f"Port {port}", "Zertifikat gefunden") print(f" Aussteller: {cert_info.get('issuer', 'N/A')}") print(f" Gültig von: {cert_info.get('not_before', 'N/A')}") print(f" Gültig bis: {cert_info.get('not_after', 'N/A')}") if cert_info.get('san'): print(f" SANs: {', '.join(cert_info['san'][:3])}...") def get_ssl_certificate(self, hostname: str, port: int = 443) -> Dict[str, Any]: """SSL-Zertifikat abrufen und analysieren""" try: context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE with socket.create_connection((hostname, port), timeout=5) as sock: with context.wrap_socket(sock, server_hostname=hostname) as ssock: cert = ssock.getpeercert() cert_info = { 'port': port, 'subject': dict(x[0] for x in cert.get('subject', [])), 'issuer': dict(x[0] for x in cert.get('issuer', [])).get('organizationName', 'N/A'), 'version': cert.get('version'), 'serial_number': cert.get('serialNumber'), 'not_before': cert.get('notBefore'), 'not_after': cert.get('notAfter'), 'san': [] } # Subject Alternative Names if 'subjectAltName' in cert: cert_info['san'] = [item[1] for item in cert['subjectAltName']] return cert_info except: return None def analyze_http(self): """HTTP/HTTPS-Informationen""" print_section("HTTP/HTTPS-Analyse") for protocol in ['https', 'http']: try: import requests url = f"{protocol}://{self.domain}" response = requests.get(url, timeout=10, allow_redirects=True) http_info = { 'protocol': protocol, 'status_code': response.status_code, 'final_url': response.url, 'headers': dict(response.headers) } self.results['http_info'][protocol] = http_info print_info(protocol.upper(), f"Status {response.status_code}") # Interessante Headers interesting_headers = [ 'Server', 'X-Powered-By', 'Content-Security-Policy', 'Strict-Transport-Security', 'X-Frame-Options' ] for header in interesting_headers: if header in response.headers: print(f" {header}: {response.headers[header]}") break # Wenn HTTPS funktioniert, HTTP überspringen except ImportError: print_warning("requests-Modul nicht installiert (pip install requests)") break except Exception as e: if protocol == 'http': print_warning(f"HTTP nicht erreichbar") def find_subdomains(self): """Häufige Subdomains finden""" print_section("Subdomain-Enumeration") common_subdomains = [ 'www', 'mail', 'webmail', 'smtp', 'pop', 'imap', 'ftp', 'admin', 'portal', 'api', 'dev', 'test', 'staging', 'blog', 'shop', 'store', 'cpanel' ] print_info("Teste", f"{len(common_subdomains)} häufige Subdomains") found = [] for subdomain in common_subdomains: full_domain = f"{subdomain}.{self.domain}" try: answers = dns.resolver.resolve(full_domain, 'A') ips = [str(rdata) for rdata in answers] found.append({'subdomain': full_domain, 'ips': ips}) print_success(f"{full_domain} → {', '.join(ips)}") except: pass self.results['subdomains'] = found if not found: print(f" Keine Standard-Subdomains gefunden") def save_results(self, output_file: str = None): """Ergebnisse speichern""" if output_file: with open(output_file, 'w', encoding='utf-8') as f: json.dump(self.results, f, indent=2, ensure_ascii=False) print_success(f"\nErgebnisse gespeichert in: {output_file}") def main(): parser = argparse.ArgumentParser( description='Domain Analyzer - Umfassende Domain-Analyse', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' Beispiele: %(prog)s example.com %(prog)s example.com google.com github.com %(prog)s -f domains.txt %(prog)s example.com -o results.json %(prog)s example.com --no-color ''' ) parser.add_argument('domains', nargs='*', help='Domain(s) zum Analysieren') parser.add_argument('-f', '--file', help='Datei mit Domains (eine pro Zeile)') parser.add_argument('-o', '--output', help='JSON-Output-Datei (nur bei einer Domain)') parser.add_argument('--output-dir', help='Verzeichnis für JSON-Outputs (bei mehreren Domains)') parser.add_argument('--no-color', action='store_true', help='Keine Farben im Output') parser.add_argument('--delay', type=float, default=1.0, help='Verzögerung zwischen Domains in Sekunden (Standard: 1.0)') args = parser.parse_args() # Farben deaktivieren wenn gewünscht if args.no_color: for attr in dir(Colors): if not attr.startswith('_'): setattr(Colors, attr, '') # Domains sammeln domains = [] if args.file: try: with open(args.file, 'r') as f: domains.extend([line.strip() for line in f if line.strip() and not line.startswith('#')]) except FileNotFoundError: print_error(f"Datei nicht gefunden: {args.file}") sys.exit(1) if args.domains: domains.extend(args.domains) if not domains: parser.print_help() print_error("\nFehler: Mindestens eine Domain erforderlich!") sys.exit(1) # Output-Verzeichnis erstellen wenn mehrere Domains if len(domains) > 1 and args.output_dir: import os os.makedirs(args.output_dir, exist_ok=True) try: all_results = [] for idx, domain in enumerate(domains, 1): if len(domains) > 1: print(f"\n{Colors.BOLD}{Colors.BLUE}{'='*80}{Colors.END}") print(f"{Colors.BOLD}{Colors.BLUE}Domain {idx}/{len(domains)}: {domain}{Colors.END}") print(f"{Colors.BOLD}{Colors.BLUE}{'='*80}{Colors.END}") analyzer = DomainAnalyzer(domain) results = analyzer.analyze_all() all_results.append(results) # Output für einzelne Domain if len(domains) == 1 and args.output: analyzer.save_results(args.output) elif len(domains) > 1 and args.output_dir: output_file = f"{args.output_dir}/{domain.replace('/', '_')}.json" analyzer.save_results(output_file) # Verzögerung zwischen Domains (außer bei der letzten) if idx < len(domains): import time time.sleep(args.delay) print_header("ALLE ANALYSEN ABGESCHLOSSEN") if len(domains) > 1: print_section("Zusammenfassung") print(f" Analysierte Domains: {len(domains)}") # Quick-Übersicht for result in all_results: domain = result['domain'] owner = "Unbekannt" if 'whois' in result and 'registrant_organization' in result['whois']: owner = result['whois']['registrant_organization'][0] elif 'whois' in result and 'registrant_name' in result['whois']: owner = result['whois']['registrant_name'][0] print(f"\n {Colors.CYAN}{domain}{Colors.END}") print(f" Inhaber: {owner}") if result.get('ips'): ips = [ip['ip'] for ip in result['ips']] print(f" IPs: {', '.join(ips[:2])}") except KeyboardInterrupt: print_error("\n\nAbgebrochen durch Benutzer") sys.exit(1) except Exception as e: print_error(f"\nFehler: {str(e)}") import traceback traceback.print_exc() sys.exit(1) if __name__ == '__main__': main()