Files
domain_analyzer/domain_analyzer.py

767 lines
29 KiB
Python

#!/usr/bin/env python3
"""
Domain Analyzer - Umfassende Domain und Mailserver Analyse
Analysiert DNS, WHOIS, Ports, SSL-Zertifikate, IP-Informationen und mehr
"""
import dns.resolver
import dns.reversename
import socket
import ssl
import json
import argparse
import sys
from datetime import datetime
from typing import Dict, List, Any
import concurrent.futures
import subprocess
import re
# Farben für Terminal-Output
class Colors:
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
END = '\033[0m'
BOLD = '\033[1m'
def print_header(text: str):
print(f"\n{Colors.BOLD}{Colors.HEADER}{'='*80}{Colors.END}")
print(f"{Colors.BOLD}{Colors.HEADER}{text.center(80)}{Colors.END}")
print(f"{Colors.BOLD}{Colors.HEADER}{'='*80}{Colors.END}\n")
def print_section(text: str):
print(f"\n{Colors.BOLD}{Colors.CYAN}[*] {text}{Colors.END}")
def print_info(key: str, value: str):
print(f"{Colors.GREEN} ├─ {key}:{Colors.END} {value}")
def print_warning(text: str):
print(f"{Colors.YELLOW}{text}{Colors.END}")
def print_error(text: str):
print(f"{Colors.RED}{text}{Colors.END}")
def print_success(text: str):
print(f"{Colors.GREEN}{text}{Colors.END}")
class DomainAnalyzer:
def __init__(self, domain: str):
self.domain = domain.lower().strip()
self.results = {
'domain': self.domain,
'timestamp': datetime.now().isoformat(),
'dns': {},
'whois': {},
'ips': [],
'ports': {},
'ssl_certificates': [],
'email_security': {},
'http_info': {},
'subdomains': []
}
def analyze_all(self):
"""Führt alle Analysen durch"""
print_header(f"DOMAIN ANALYSE: {self.domain}")
self.analyze_dns()
self.analyze_whois()
self.analyze_ips()
self.check_email_security()
self.scan_common_ports()
self.analyze_ssl_certificates()
self.analyze_http()
self.find_subdomains()
self.analyze_ownership() # Neue Funktion für Inhaberschaft
return self.results
def analyze_ownership(self):
"""Zusätzliche Analyse zur Inhaberschaft"""
print_section("Zusätzliche Inhaber-Informationen")
# ASN Organisation als Hinweis
if self.results['ips']:
print_info("Hosting-Provider", "Basierend auf IP-Informationen:")
for ip_info in self.results['ips'][:2]:
if ip_info.get('organization'):
print(f" {ip_info['ip']}: {ip_info['organization']}")
if ip_info.get('country'):
print(f" Land: {ip_info['country']}")
# Nameserver-Organisation
if 'NS' in self.results['dns'] and self.results['dns']['NS']:
print()
print_info("DNS-Provider", "Basierend auf Nameservern:")
for ns in self.results['dns']['NS'][:2]:
ns_clean = ns.rstrip('.')
# Versuche WHOIS für Nameserver
try:
result = subprocess.run(
['whois', ns_clean],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
org_match = re.search(r'OrgName:\s*(.+)', result.stdout)
if org_match:
print(f" {ns_clean}: {org_match.group(1).strip()}")
else:
print(f" {ns_clean}")
except:
print(f" {ns_clean}")
# Zertifikat-Informationen
if self.results['ssl_certificates']:
print()
print_info("SSL-Zertifikat Inhaber", "")
for cert in self.results['ssl_certificates'][:1]:
if cert.get('subject'):
subject = cert['subject']
if 'organizationName' in subject:
print(f" Organisation: {subject['organizationName']}")
if 'commonName' in subject:
print(f" Common Name: {subject['commonName']}")
# Zusammenfassung
print()
print_header("📋 ZUSAMMENFASSUNG VERANTWORTLICHKEIT")
owner = "Unbekannt"
if 'registrant_organization' in self.results.get('whois', {}):
owner = self.results['whois']['registrant_organization'][0]
elif 'registrant_name' in self.results.get('whois', {}):
owner = self.results['whois']['registrant_name'][0]
print_success(f"Domain-Inhaber: {owner}")
if 'registrar' in self.results.get('whois', {}):
print_info("Registrar", self.results['whois']['registrar'][0])
if self.results['ips'] and self.results['ips'][0].get('organization'):
print_info("Hosting", self.results['ips'][0]['organization'])
def analyze_dns(self):
"""DNS-Records analysieren"""
print_section("DNS Records")
record_types = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'SOA', 'CAA', 'CNAME']
for record_type in record_types:
try:
answers = dns.resolver.resolve(self.domain, record_type)
records = []
for rdata in answers:
if record_type == 'MX':
records.append(f"{rdata.preference} {rdata.exchange}")
elif record_type == 'SOA':
records.append(f"{rdata.mname} {rdata.rname}")
else:
records.append(str(rdata))
self.results['dns'][record_type] = records
print_info(record_type, f"{len(records)} Record(s)")
for record in records:
print(f" {record}")
except dns.resolver.NoAnswer:
self.results['dns'][record_type] = []
except dns.resolver.NXDOMAIN:
print_error(f"Domain {self.domain} existiert nicht!")
sys.exit(1)
except Exception as e:
print_warning(f"{record_type}: Fehler - {str(e)}")
def get_base_domain(self, domain: str) -> str:
"""Extrahiert die Basis-Domain (z.B. example.com aus sub.example.com)"""
parts = domain.split('.')
if len(parts) >= 2:
# Nimm die letzten 2 Teile (domain.tld)
# Für .co.uk etc. müsste man eine TLD-Liste verwenden, aber für die meisten Fälle reicht das
return '.'.join(parts[-2:])
return domain
def analyze_whois(self):
"""WHOIS-Informationen abrufen"""
print_section("WHOIS Information")
domain_to_check = self.domain
is_subdomain = False
# Prüfe ob es eine Subdomain ist (mehr als 2 Teile)
parts = self.domain.split('.')
if len(parts) > 2:
is_subdomain = True
base_domain = self.get_base_domain(self.domain)
print_info("Subdomain erkannt", f"Prüfe Hauptdomain: {base_domain}")
domain_to_check = base_domain
try:
result = subprocess.run(
['whois', domain_to_check],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
whois_data = result.stdout
# Prüfe ob "No match" in der Antwort ist
if 'No match' in whois_data or 'NOT FOUND' in whois_data:
print_error(f"Keine WHOIS-Daten für {domain_to_check} gefunden")
return
# Erweiterte Muster für Inhaber-Informationen
patterns = {
'registrar': [
r'Registrar:\s*(.+)',
r'Registrar Name:\s*(.+)'
],
'registrant_organization': [
r'Registrant Organization:\s*(.+)',
r'Organization:\s*(.+)',
r'org:\s*(.+)'
],
'registrant_name': [
r'Registrant Name:\s*(.+)',
r'Registrant:\s*(.+)'
],
'registrant_country': [
r'Registrant Country:\s*([A-Z]{2})',
r'Country:\s*([A-Z]{2})'
],
'registrant_email': [
r'Registrant Email:\s*(.+)',
r'Email:\s*(.+)'
],
'creation_date': [
r'Creation Date:\s*(.+)',
r'Created:\s*(.+)',
r'created:\s*(.+)'
],
'expiration_date': [
r'Registry Expiry Date:\s*(.+)',
r'Registrar Registration Expiration Date:\s*(.+)',
r'Expiry Date:\s*(.+)',
r'paid-till:\s*(.+)'
],
'status': [
r'Domain Status:\s*(.+)',
r'Status:\s*(.+)'
],
'name_servers': [
r'Name Server:\s*(.+)',
r'nserver:\s*(.+)'
],
}
self.results['whois']['domain_checked'] = domain_to_check
if is_subdomain:
self.results['whois']['note'] = f"Subdomain - WHOIS von Hauptdomain {domain_to_check}"
# Extrahiere Informationen mit mehreren Pattern-Varianten
for key, pattern_list in patterns.items():
if not isinstance(pattern_list, list):
pattern_list = [pattern_list]
for pattern in pattern_list:
matches = re.findall(pattern, whois_data, re.IGNORECASE | re.MULTILINE)
if matches:
# Bereinige Matches
cleaned_matches = [m.strip() for m in matches if m.strip() and m.strip() != 'REDACTED FOR PRIVACY']
if cleaned_matches:
self.results['whois'][key] = cleaned_matches
if key == 'name_servers':
print_info(key.replace('_', ' ').title(), f"{len(cleaned_matches)} Server")
for ns in cleaned_matches[:5]:
print(f" {ns}")
elif key == 'status':
print_info(key.replace('_', ' ').title(), f"{len(cleaned_matches)} Status(se)")
for status in cleaned_matches[:3]:
print(f" {status}")
else:
display_name = key.replace('_', ' ').title()
print_info(display_name, cleaned_matches[0])
break # Erstes passendes Pattern gefunden
# WICHTIG: Inhaber hervorheben
print()
print_header("🔍 DOMAIN-INHABER")
if 'registrant_organization' in self.results['whois']:
print_success(f"Organisation: {self.results['whois']['registrant_organization'][0]}")
elif 'registrant_name' in self.results['whois']:
print_success(f"Name: {self.results['whois']['registrant_name'][0]}")
else:
print_warning("Inhaber nicht identifizierbar (möglicherweise durch Privacy Protection verborgen)")
if 'registrant_country' in self.results['whois']:
print_info("Land", self.results['whois']['registrant_country'][0])
if 'registrant_email' in self.results['whois']:
email = self.results['whois']['registrant_email'][0]
if 'contact-form' not in email and 'privacy' not in email.lower():
print_info("Email", email)
if 'registrar' in self.results['whois']:
print_info("Registrar", self.results['whois']['registrar'][0])
self.results['whois']['raw'] = whois_data
else:
print_error("WHOIS-Abfrage fehlgeschlagen")
except FileNotFoundError:
print_error("whois-Tool nicht installiert")
print_info("Installation", "sudo pacman -S whois")
except subprocess.TimeoutExpired:
print_error("WHOIS-Abfrage Timeout")
except Exception as e:
print_error(f"WHOIS-Fehler: {str(e)}")
def analyze_ips(self):
"""IP-Adressen analysieren"""
print_section("IP-Adress-Analyse")
ips = []
# A Records
if 'A' in self.results['dns']:
ips.extend(self.results['dns']['A'])
# AAAA Records
if 'AAAA' in self.results['dns']:
ips.extend(self.results['dns']['AAAA'])
for ip in ips:
ip_info = self.get_ip_info(ip)
self.results['ips'].append(ip_info)
print_info("IP", ip)
if ip_info.get('reverse_dns'):
print(f" Reverse DNS: {ip_info['reverse_dns']}")
if ip_info.get('asn'):
print(f" ASN: {ip_info['asn']}")
if ip_info.get('country'):
print(f" Land: {ip_info['country']}")
def get_ip_info(self, ip: str) -> Dict[str, Any]:
"""Detaillierte IP-Informationen"""
info = {'ip': ip}
# Reverse DNS
try:
rev_name = dns.reversename.from_address(ip)
rev_dns = str(dns.resolver.resolve(rev_name, "PTR")[0])
info['reverse_dns'] = rev_dns
except:
info['reverse_dns'] = None
# ASN und Geolocation via whois
try:
result = subprocess.run(
['whois', ip],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
whois_output = result.stdout
# ASN extrahieren
asn_match = re.search(r'AS(\d+)', whois_output)
if asn_match:
info['asn'] = f"AS{asn_match.group(1)}"
# Organisation
org_match = re.search(r'OrgName:\s*(.+)', whois_output)
if org_match:
info['organization'] = org_match.group(1).strip()
# Land
country_match = re.search(r'Country:\s*([A-Z]{2})', whois_output)
if country_match:
info['country'] = country_match.group(1)
except:
pass
return info
def check_email_security(self):
"""Email-Sicherheit prüfen (SPF, DMARC, DKIM)"""
print_section("Email-Sicherheit")
# SPF prüfen
spf_record = None
if 'TXT' in self.results['dns']:
for txt in self.results['dns']['TXT']:
if 'v=spf1' in txt.lower():
spf_record = txt
break
if spf_record:
print_success(f"SPF: Vorhanden")
print(f" {spf_record}")
self.results['email_security']['spf'] = spf_record
else:
print_warning("SPF: Nicht gefunden!")
self.results['email_security']['spf'] = None
# DMARC prüfen
try:
dmarc_answers = dns.resolver.resolve(f'_dmarc.{self.domain}', 'TXT')
dmarc_record = str(dmarc_answers[0])
print_success(f"DMARC: Vorhanden")
print(f" {dmarc_record}")
self.results['email_security']['dmarc'] = dmarc_record
except:
print_warning("DMARC: Nicht gefunden!")
self.results['email_security']['dmarc'] = None
# DKIM Selektoren testen (häufige)
print_info("DKIM", "Teste häufige Selektoren...")
dkim_selectors = ['default', 'google', 'k1', 'k2', 'selector1', 'selector2', 'mail', 'dkim']
dkim_found = []
for selector in dkim_selectors:
try:
dkim_query = f'{selector}._domainkey.{self.domain}'
dkim_answers = dns.resolver.resolve(dkim_query, 'TXT')
dkim_record = str(dkim_answers[0])
dkim_found.append({'selector': selector, 'record': dkim_record})
print(f"{selector}: Gefunden")
except:
pass
if dkim_found:
self.results['email_security']['dkim'] = dkim_found
else:
print(f" Keine DKIM-Records mit Standard-Selektoren gefunden")
self.results['email_security']['dkim'] = []
def scan_common_ports(self):
"""Häufige Ports scannen"""
print_section("Port-Scan (häufige Ports)")
# IP-Adressen sammeln
ips_to_scan = []
if 'A' in self.results['dns']:
ips_to_scan.extend(self.results['dns']['A'])
if not ips_to_scan:
print_warning("Keine IPv4-Adressen zum Scannen")
return
# Häufige Ports
common_ports = {
21: 'FTP',
22: 'SSH',
25: 'SMTP',
53: 'DNS',
80: 'HTTP',
110: 'POP3',
143: 'IMAP',
443: 'HTTPS',
465: 'SMTPS',
587: 'SMTP Submission',
993: 'IMAPS',
995: 'POP3S',
3306: 'MySQL',
3389: 'RDP',
5432: 'PostgreSQL',
8080: 'HTTP-Alt',
8443: 'HTTPS-Alt'
}
for ip in ips_to_scan[:1]: # Nur erste IP scannen
print_info("Scanne IP", ip)
open_ports = []
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
future_to_port = {
executor.submit(self.check_port, ip, port): port
for port in common_ports.keys()
}
for future in concurrent.futures.as_completed(future_to_port):
port = future_to_port[future]
try:
is_open = future.result()
if is_open:
service = common_ports[port]
open_ports.append({'port': port, 'service': service})
print_success(f"Port {port} ({service}): OFFEN")
except:
pass
self.results['ports'][ip] = open_ports
def check_port(self, ip: str, port: int, timeout: float = 2.0) -> bool:
"""Prüft ob ein Port offen ist"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip, port))
sock.close()
return result == 0
except:
return False
def analyze_ssl_certificates(self):
"""SSL/TLS-Zertifikate analysieren"""
print_section("SSL/TLS-Zertifikate")
# Ports mit SSL/TLS
ssl_ports = [443, 465, 587, 993, 995, 8443]
ips_to_check = []
if 'A' in self.results['dns']:
ips_to_check.extend(self.results['dns']['A'])
for ip in ips_to_check[:1]: # Nur erste IP
for port in ssl_ports:
if self.check_port(ip, port, timeout=1.0):
cert_info = self.get_ssl_certificate(self.domain, port)
if cert_info:
self.results['ssl_certificates'].append(cert_info)
print_info(f"Port {port}", "Zertifikat gefunden")
print(f" Aussteller: {cert_info.get('issuer', 'N/A')}")
print(f" Gültig von: {cert_info.get('not_before', 'N/A')}")
print(f" Gültig bis: {cert_info.get('not_after', 'N/A')}")
if cert_info.get('san'):
print(f" SANs: {', '.join(cert_info['san'][:3])}...")
def get_ssl_certificate(self, hostname: str, port: int = 443) -> Dict[str, Any]:
"""SSL-Zertifikat abrufen und analysieren"""
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection((hostname, port), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
cert_info = {
'port': port,
'subject': dict(x[0] for x in cert.get('subject', [])),
'issuer': dict(x[0] for x in cert.get('issuer', [])).get('organizationName', 'N/A'),
'version': cert.get('version'),
'serial_number': cert.get('serialNumber'),
'not_before': cert.get('notBefore'),
'not_after': cert.get('notAfter'),
'san': []
}
# Subject Alternative Names
if 'subjectAltName' in cert:
cert_info['san'] = [item[1] for item in cert['subjectAltName']]
return cert_info
except:
return None
def analyze_http(self):
"""HTTP/HTTPS-Informationen"""
print_section("HTTP/HTTPS-Analyse")
for protocol in ['https', 'http']:
try:
import requests
url = f"{protocol}://{self.domain}"
response = requests.get(url, timeout=10, allow_redirects=True)
http_info = {
'protocol': protocol,
'status_code': response.status_code,
'final_url': response.url,
'headers': dict(response.headers)
}
self.results['http_info'][protocol] = http_info
print_info(protocol.upper(), f"Status {response.status_code}")
# Interessante Headers
interesting_headers = [
'Server', 'X-Powered-By', 'Content-Security-Policy',
'Strict-Transport-Security', 'X-Frame-Options'
]
for header in interesting_headers:
if header in response.headers:
print(f" {header}: {response.headers[header]}")
break # Wenn HTTPS funktioniert, HTTP überspringen
except ImportError:
print_warning("requests-Modul nicht installiert (pip install requests)")
break
except Exception as e:
if protocol == 'http':
print_warning(f"HTTP nicht erreichbar")
def find_subdomains(self):
"""Häufige Subdomains finden"""
print_section("Subdomain-Enumeration")
common_subdomains = [
'www', 'mail', 'webmail', 'smtp', 'pop', 'imap',
'ftp', 'admin', 'portal', 'api', 'dev', 'test',
'staging', 'blog', 'shop', 'store', 'cpanel'
]
print_info("Teste", f"{len(common_subdomains)} häufige Subdomains")
found = []
for subdomain in common_subdomains:
full_domain = f"{subdomain}.{self.domain}"
try:
answers = dns.resolver.resolve(full_domain, 'A')
ips = [str(rdata) for rdata in answers]
found.append({'subdomain': full_domain, 'ips': ips})
print_success(f"{full_domain}{', '.join(ips)}")
except:
pass
self.results['subdomains'] = found
if not found:
print(f" Keine Standard-Subdomains gefunden")
def save_results(self, output_file: str = None):
"""Ergebnisse speichern"""
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
print_success(f"\nErgebnisse gespeichert in: {output_file}")
def main():
parser = argparse.ArgumentParser(
description='Domain Analyzer - Umfassende Domain-Analyse',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Beispiele:
%(prog)s example.com
%(prog)s example.com google.com github.com
%(prog)s -f domains.txt
%(prog)s example.com -o results.json
%(prog)s example.com --no-color
'''
)
parser.add_argument('domains', nargs='*', help='Domain(s) zum Analysieren')
parser.add_argument('-f', '--file', help='Datei mit Domains (eine pro Zeile)')
parser.add_argument('-o', '--output', help='JSON-Output-Datei (nur bei einer Domain)')
parser.add_argument('--output-dir', help='Verzeichnis für JSON-Outputs (bei mehreren Domains)')
parser.add_argument('--no-color', action='store_true', help='Keine Farben im Output')
parser.add_argument('--delay', type=float, default=1.0, help='Verzögerung zwischen Domains in Sekunden (Standard: 1.0)')
args = parser.parse_args()
# Farben deaktivieren wenn gewünscht
if args.no_color:
for attr in dir(Colors):
if not attr.startswith('_'):
setattr(Colors, attr, '')
# Domains sammeln
domains = []
if args.file:
try:
with open(args.file, 'r') as f:
domains.extend([line.strip() for line in f if line.strip() and not line.startswith('#')])
except FileNotFoundError:
print_error(f"Datei nicht gefunden: {args.file}")
sys.exit(1)
if args.domains:
domains.extend(args.domains)
if not domains:
parser.print_help()
print_error("\nFehler: Mindestens eine Domain erforderlich!")
sys.exit(1)
# Output-Verzeichnis erstellen wenn mehrere Domains
if len(domains) > 1 and args.output_dir:
import os
os.makedirs(args.output_dir, exist_ok=True)
try:
all_results = []
for idx, domain in enumerate(domains, 1):
if len(domains) > 1:
print(f"\n{Colors.BOLD}{Colors.BLUE}{'='*80}{Colors.END}")
print(f"{Colors.BOLD}{Colors.BLUE}Domain {idx}/{len(domains)}: {domain}{Colors.END}")
print(f"{Colors.BOLD}{Colors.BLUE}{'='*80}{Colors.END}")
analyzer = DomainAnalyzer(domain)
results = analyzer.analyze_all()
all_results.append(results)
# Output für einzelne Domain
if len(domains) == 1 and args.output:
analyzer.save_results(args.output)
elif len(domains) > 1 and args.output_dir:
output_file = f"{args.output_dir}/{domain.replace('/', '_')}.json"
analyzer.save_results(output_file)
# Verzögerung zwischen Domains (außer bei der letzten)
if idx < len(domains):
import time
time.sleep(args.delay)
print_header("ALLE ANALYSEN ABGESCHLOSSEN")
if len(domains) > 1:
print_section("Zusammenfassung")
print(f" Analysierte Domains: {len(domains)}")
# Quick-Übersicht
for result in all_results:
domain = result['domain']
owner = "Unbekannt"
if 'whois' in result and 'registrant_organization' in result['whois']:
owner = result['whois']['registrant_organization'][0]
elif 'whois' in result and 'registrant_name' in result['whois']:
owner = result['whois']['registrant_name'][0]
print(f"\n {Colors.CYAN}{domain}{Colors.END}")
print(f" Inhaber: {owner}")
if result.get('ips'):
ips = [ip['ip'] for ip in result['ips']]
print(f" IPs: {', '.join(ips[:2])}")
except KeyboardInterrupt:
print_error("\n\nAbgebrochen durch Benutzer")
sys.exit(1)
except Exception as e:
print_error(f"\nFehler: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()