#!/usr/bin/env python3 """ Request-Analyse für JTL-Shop Domains - Python Version Analysiert Apache/Nginx Logs mit detaillierter Bot-Erkennung und IP-Klassifizierung """ import sys import os import re import gzip import subprocess import socket import ipaddress from datetime import datetime, timedelta from collections import defaultdict, Counter from pathlib import Path import argparse import multiprocessing from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed import functools import signal import tempfile import json # Basis-Pfad für vhosts VHOSTS_BASE = "/var/www/vhosts" # Cache für IP-Lookups (wird zwischen Threads geteilt) ip_info_cache = {} # Monatsnamen-Mapping für Apache-Logs MONTH_NAMES = { 'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12 } def discover_domains(): """Entdeckt alle verfügbaren Domains/Shops im vhosts Verzeichnis""" domains = [] vhosts_path = Path(VHOSTS_BASE) if not vhosts_path.exists(): return domains # Durchsuche alle Verzeichnisse in vhosts for domain_dir in vhosts_path.iterdir(): if domain_dir.is_dir(): # Prüfe ob ein logs-Verzeichnis existiert logs_dir = domain_dir / "logs" if logs_dir.exists() and logs_dir.is_dir(): # Prüfe ob Log-Dateien vorhanden sind log_files = list(logs_dir.glob("access*log*")) if log_files: domain_name = domain_dir.name # Filtere System-Verzeichnisse if not domain_name.startswith('.') and domain_name not in ['default', 'system']: domains.append(domain_name) return sorted(domains) def select_domain_interactive(domains): """Interaktive Domain-Auswahl mit Logging""" interaction_log = [] interaction_log.append("\n" + "="*67) interaction_log.append("VERFÜGBARE SHOPS/DOMAINS") interaction_log.append("="*67) interaction_log.append("") print("\n" + "="*67) print("VERFÜGBARE SHOPS/DOMAINS") print("="*67) print() if not domains: msg = f"❌ Keine Domains mit Log-Dateien gefunden in {VHOSTS_BASE}" print(msg) interaction_log.append(msg) return None, interaction_log # Zeige nummerierte Liste for i, domain in enumerate(domains, 1): # Prüfe Anzahl der Log-Dateien für zusätzliche Info logs_dir = Path(VHOSTS_BASE) / domain / "logs" log_count = len(list(logs_dir.glob("access*log*"))) line = f" {i:2d}. {domain:<40} ({log_count} Log-Dateien)" print(line) interaction_log.append(line) interaction_log.append("") interaction_log.append("Eingabemöglichkeiten:") interaction_log.append(" - Nummer eingeben (z.B. '1' oder '2')") interaction_log.append(" - Domain-Namen eingeben (z.B. 'taschengelddieb.de')") interaction_log.append(" - Enter für Abbruch") interaction_log.append("") print() print("Eingabemöglichkeiten:") print(" - Nummer eingeben (z.B. '1' oder '2')") print(" - Domain-Namen eingeben (z.B. 'taschengelddieb.de')") print(" - Enter für Abbruch") print() while True: choice = input("🔍 Welchen Shop analysieren? ").strip() interaction_log.append(f"🔍 Welchen Shop analysieren? {choice}") if not choice: msg = "Abbruch." print(msg) interaction_log.append(msg) return None, interaction_log # Prüfe ob Nummer eingegeben wurde try: num = int(choice) if 1 <= num <= len(domains): selected = domains[num - 1] msg = f"\n✅ Ausgewählt: {selected}" print(msg) interaction_log.append(msg) return selected, interaction_log else: msg = f"❌ Ungültige Nummer. Bitte 1-{len(domains)} eingeben." print(msg) interaction_log.append(msg) except ValueError: # Prüfe ob Domain-Name eingegeben wurde if choice in domains: msg = f"\n✅ Ausgewählt: {choice}" print(msg) interaction_log.append(msg) return choice, interaction_log # Prüfe ob Teil-Match existiert matches = [d for d in domains if choice.lower() in d.lower()] if len(matches) == 1: msg = f"\n✅ Ausgewählt: {matches[0]}" print(msg) interaction_log.append(msg) return matches[0], interaction_log elif len(matches) > 1: msg = f"❌ Mehrere Treffer gefunden: {', '.join(matches)}" print(msg) interaction_log.append(msg) print(" Bitte genauer spezifizieren.") interaction_log.append(" Bitte genauer spezifizieren.") else: msg = f"❌ Domain '{choice}' nicht gefunden." print(msg) interaction_log.append(msg) def get_timespan_interactive(): """Interaktive Zeitspannen-Eingabe mit Logging""" interaction_log = [] interaction_log.append("\n" + "="*67) interaction_log.append("ZEITSPANNE FÜR ANALYSE") interaction_log.append("="*67) interaction_log.append("") interaction_log.append("Beispiele:") interaction_log.append(" - '6' oder '6h' für die letzten 6 Stunden") interaction_log.append(" - '24' oder '24h' für die letzten 24 Stunden") interaction_log.append(" - '72' oder '72h' für die letzten 3 Tage") interaction_log.append(" - Enter für Standard (48 Stunden)") interaction_log.append("") print("\n" + "="*67) print("ZEITSPANNE FÜR ANALYSE") print("="*67) print() print("Beispiele:") print(" - '6' oder '6h' für die letzten 6 Stunden") print(" - '24' oder '24h' für die letzten 24 Stunden") print(" - '72' oder '72h' für die letzten 3 Tage") print(" - Enter für Standard (48 Stunden)") print() while True: choice = input("⏰ Zeitspanne in Stunden: ").strip() interaction_log.append(f"⏰ Zeitspanne in Stunden: {choice}") if not choice: msg = "✅ Verwende Standard: 48 Stunden" print(msg) interaction_log.append(msg) return 48, interaction_log # Entferne 'h' falls vorhanden choice = choice.rstrip('h') try: hours = int(choice) if hours <= 0: msg = "❌ Zeitspanne muss größer als 0 sein." print(msg) interaction_log.append(msg) elif hours > 8760: # 1 Jahr msg = "❌ Zeitspanne zu groß (max. 8760 Stunden = 1 Jahr)" print(msg) interaction_log.append(msg) else: msg = f"✅ Zeitspanne: {hours} Stunden" print(msg) interaction_log.append(msg) return hours, interaction_log except ValueError: msg = "❌ Ungültige Eingabe. Bitte eine Zahl eingeben." print(msg) interaction_log.append(msg) def get_top_n_interactive(): """Interaktive Top-N Eingabe mit Logging""" interaction_log = [] interaction_log.append("\n" + "="*67) interaction_log.append("ANZAHL DER ERGEBNISSE") interaction_log.append("="*67) interaction_log.append("") interaction_log.append("Beispiele:") interaction_log.append(" - '50' für Top 50 Ergebnisse") interaction_log.append(" - '100' für Top 100 Ergebnisse") interaction_log.append(" - '0' oder 'all' für ALLE Ergebnisse") interaction_log.append(" - Enter für ALLE Ergebnisse") interaction_log.append("") print("\n" + "="*67) print("ANZAHL DER ERGEBNISSE") print("="*67) print() print("Beispiele:") print(" - '50' für Top 50 Ergebnisse") print(" - '100' für Top 100 Ergebnisse") print(" - '0' oder 'all' für ALLE Ergebnisse") print(" - Enter für ALLE Ergebnisse") print() while True: choice = input("📊 Anzahl der Top-Ergebnisse: ").strip().lower() interaction_log.append(f"📊 Anzahl der Top-Ergebnisse: {choice}") if not choice or choice == '0' or choice == 'all' or choice == 'alle': msg = "✅ Zeige ALLE Ergebnisse" print(msg) interaction_log.append(msg) return None, interaction_log try: top_n = int(choice) if top_n <= 0: msg = "✅ Zeige ALLE Ergebnisse" print(msg) interaction_log.append(msg) return None, interaction_log else: msg = f"✅ Zeige Top {top_n} Ergebnisse" print(msg) interaction_log.append(msg) return top_n, interaction_log except ValueError: msg = "❌ Ungültige Eingabe. Bitte eine Zahl eingeben oder 'all' für alle." print(msg) interaction_log.append(msg) def get_extreme_rate_threshold_interactive(): """Interaktive Eingabe für extreme Request-Rate Schwellwert mit Logging""" interaction_log = [] interaction_log.append("\n" + "="*67) interaction_log.append("EXTREME REQUEST-RATE SCHWELLWERT") interaction_log.append("="*67) interaction_log.append("") interaction_log.append("Ab welcher Request-Rate (Requests/Minute) soll eine IP") interaction_log.append("als EXTREM eingestuft und zum sofortigen Block empfohlen werden?") interaction_log.append("") interaction_log.append("Beispiele:") interaction_log.append(" - '60' = 1 Request pro Sekunde (Standard)") interaction_log.append(" - '120' = 2 Requests pro Sekunde") interaction_log.append(" - '250' = ~4 Requests pro Sekunde") interaction_log.append(" - '600' = 10 Requests pro Sekunde") interaction_log.append(" - Enter für Standard (60)") interaction_log.append("") print("\n" + "="*67) print("EXTREME REQUEST-RATE SCHWELLWERT") print("="*67) print() print("Ab welcher Request-Rate (Requests/Minute) soll eine IP") print("als EXTREM eingestuft und zum sofortigen Block empfohlen werden?") print() print("Beispiele:") print(" - '60' = 1 Request pro Sekunde (Standard)") print(" - '120' = 2 Requests pro Sekunde") print(" - '250' = ~4 Requests pro Sekunde") print(" - '600' = 10 Requests pro Sekunde") print(" - Enter für Standard (60)") print() while True: choice = input("🔴 Extreme Rate Schwellwert (Requests/Minute): ").strip() interaction_log.append(f"🔴 Extreme Rate Schwellwert (Requests/Minute): {choice}") if not choice: msg = "✅ Verwende Standard: 60 Requests/Minute" print(msg) interaction_log.append(msg) return 60, interaction_log try: threshold = int(choice) if threshold <= 0: msg = "❌ Schwellwert muss größer als 0 sein." print(msg) interaction_log.append(msg) elif threshold > 10000: msg = "❌ Schwellwert zu hoch (max. 10000)" print(msg) interaction_log.append(msg) else: reqs_per_sec = threshold / 60 msg = f"✅ Extreme Rate Schwellwert: {threshold} Requests/Minute (~{reqs_per_sec:.1f}/Sekunde)" print(msg) interaction_log.append(msg) return threshold, interaction_log except ValueError: msg = "❌ Ungültige Eingabe. Bitte eine Zahl eingeben." print(msg) interaction_log.append(msg) class LogAnalyzer: def __init__(self, hours=48, top_n=None, domain=None, extreme_rate_threshold=60, interactive_log=None): if not domain: raise ValueError("Domain muss angegeben werden!") self.domain = domain self.log_dir = f"/var/www/vhosts/{domain}/logs" self.hours = hours self.top_n = top_n self.extreme_rate_threshold = extreme_rate_threshold self.cutoff_time = datetime.now() - timedelta(hours=hours) self.interactive_log = interactive_log or [] # Speichert interaktive Auswahl # Anzahl der CPU-Cores für Parallel-Verarbeitung self.total_cores = multiprocessing.cpu_count() self.max_workers = max(1, self.total_cores - 4) # Reserve 4 Cores für System # Datenstrukturen für Analyse self.all_requests = [] self.ip_cache = {} # Erstelle Ausgabedatei timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") script_dir = Path(__file__).parent if self.top_n: self.output_file = script_dir / f"shop_analyse_{hours}h_top{top_n}_{timestamp}.txt" else: self.output_file = script_dir / f"shop_analyse_{hours}h_ALL_{timestamp}.txt" def print_and_log(self, message="", end="\n"): """Gibt Text sowohl auf Console als auch in Datei aus""" print(message, end=end) with open(self.output_file, 'a', encoding='utf-8') as f: f.write(message + end) def parse_apache_timestamp(self, timestamp_str): """Parst Apache Log Timestamps [DD/Mon/YYYY:HH:MM:SS +ZONE]""" try: # Format: [10/Jan/2024:15:30:45 +0100] match = re.match(r'\[(\d{2})/(\w{3})/(\d{4}):(\d{2}):(\d{2}):(\d{2})', timestamp_str) if match: day = int(match.group(1)) month = MONTH_NAMES.get(match.group(2), 1) year = int(match.group(3)) hour = int(match.group(4)) minute = int(match.group(5)) second = int(match.group(6)) return datetime(year, month, day, hour, minute, second) except Exception: pass return None def parse_log_line(self, line): """Parst eine Apache/Nginx Log-Zeile""" # Apache Combined Log Format # IP - - [timestamp] "METHOD URL HTTP/X.X" status size "referer" "user-agent" pattern = r'^(\S+) \S+ \S+ (\[[^\]]+\]) "([^"]*)" (\d{3}) (\S+) "([^"]*)" "([^"]*)"' match = re.match(pattern, line) if match: ip = match.group(1) timestamp_str = match.group(2) request = match.group(3) status = match.group(4) size = match.group(5) referer = match.group(6) user_agent = match.group(7) # Parse timestamp timestamp = self.parse_apache_timestamp(timestamp_str) # Parse request request_match = re.match(r'^(\w+) (\S+)', request) method = request_match.group(1) if request_match else "" url = request_match.group(2) if request_match else "" return { 'ip': ip, 'timestamp': timestamp, 'method': method, 'url': url, 'status': status, 'size': size, 'referer': referer, 'user_agent': user_agent, 'raw_line': line } return None def process_log_file(self, filepath): """Verarbeitet eine einzelne Log-Datei""" requests = [] try: # Öffne Datei (gz oder normal) if filepath.suffix == '.gz': file_handle = gzip.open(filepath, 'rt', encoding='utf-8', errors='ignore') else: file_handle = open(filepath, 'r', encoding='utf-8', errors='ignore') with file_handle: for line in file_handle: parsed = self.parse_log_line(line.strip()) if parsed and parsed['timestamp']: # Prüfe ob innerhalb des Zeitfensters if parsed['timestamp'] >= self.cutoff_time: requests.append(parsed) except Exception as e: self.print_and_log(f" Fehler beim Verarbeiten von {filepath}: {e}") return requests def get_ip_info(self, ip): """Holt detaillierte IP-Informationen via whois""" # Check Cache if ip in self.ip_cache: return self.ip_cache[ip] try: # Versuche GeoIP country = "??" try: result = subprocess.run(['geoiplookup', ip], capture_output=True, text=True, timeout=2) if result.returncode == 0 and 'GeoIP Country Edition:' in result.stdout: country = result.stdout.split('GeoIP Country Edition:')[1].split(',')[0].strip() except: pass # Versuche whois via cymru asn = "?" as_name = "Unknown" try: result = subprocess.run(['whois', '-h', 'whois.cymru.com', f' -v {ip}'], capture_output=True, text=True, timeout=3) if result.returncode == 0: lines = result.stdout.strip().split('\n') if lines: parts = lines[-1].split() if len(parts) > 5: asn = parts[0] as_name = ' '.join(parts[5:])[:80] except: pass # Fallback auf reguläres whois if asn == "?" or asn == "NA": try: result = subprocess.run(['whois', ip], capture_output=True, text=True, timeout=5) if result.returncode == 0: whois_text = result.stdout # Extrahiere ASN asn_match = re.search(r'(?:aut-num|origin|originas):\s*(?:AS)?(\d+)', whois_text, re.IGNORECASE) if asn_match: asn = asn_match.group(1) # Extrahiere Organisation org_patterns = [ r'(?:org-name|orgname|organisation|organization):\s*(.+)', r'descr:\s*(.+)', r'netname:\s*(.+)' ] for pattern in org_patterns: org_match = re.search(pattern, whois_text, re.IGNORECASE) if org_match: as_name = org_match.group(1).strip()[:80] break except: pass info = f"{country} | AS{asn} {as_name}" self.ip_cache[ip] = info return info except Exception as e: info = f"?? | Lookup failed" self.ip_cache[ip] = info return info def get_ip_info_batch(self, ips): """Holt IP-Informationen für mehrere IPs parallel""" with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = {executor.submit(self.get_ip_info, ip): ip for ip in ips} results = {} for future in as_completed(futures): ip = futures[future] try: results[ip] = future.result() except Exception as e: results[ip] = "?? | Lookup failed" return results def categorize_ip(self, ip_info): """Kategorisiert eine IP basierend auf whois-Informationen""" info_lower = ip_info.lower() if any(x in info_lower for x in ['amazon', 'aws', 'ec2', 'azure', 'google cloud', 'gcp', 'digitalocean', 'linode', 'vultr', 'ovh cloud']): return 'cloud' elif any(x in info_lower for x in ['datacenter', 'data center', 'hosting', 'server', 'colocation', 'colo']): return 'datacenter' elif any(x in info_lower for x in ['hetzner', 'contabo', 'netcup', 'strato', '1und1', 'ionos']): return 'hosting' elif any(x in info_lower for x in ['vpn', 'proxy', 'private relay']): return 'vpn' elif any(x in info_lower for x in ['tor', 'tor-exit', 'anonymizer']): return 'tor' elif any(x in info_lower for x in ['telecom', 'telekom', 'vodafone', 'o2', 'kabel', 'broadband', 'dsl', 'cable', 'fiber', 'residential']): return 'residential' else: return 'isp' def is_bot_user_agent(self, user_agent): """Prüft ob User-Agent ein Bot ist""" ua_lower = user_agent.lower() bot_patterns = [ 'bot', 'crawler', 'spider', 'scraper', 'curl', 'wget', 'python', 'go-http', 'java', 'apache', 'scrapy', 'requests', 'aiohttp', 'axios', 'node-fetch', 'http.rb', 'libwww', 'semrush', 'ahrefs', 'mj12bot', 'dotbot', 'yandex', 'baidu', 'bingbot', 'googlebot', 'duckduck', 'slurp', 'facebot', 'whatsapp', 'telegram', 'discord', 'slack' ] return any(pattern in ua_lower for pattern in bot_patterns) def analyze(self): """Hauptanalyse-Funktion""" # Schreibe interaktive Session-Log falls vorhanden if self.interactive_log: self.print_and_log("=" * 67) self.print_and_log("INTERAKTIVE SESSION-PROTOKOLL") self.print_and_log("=" * 67) for line in self.interactive_log: self.print_and_log(line) self.print_and_log("=" * 67) self.print_and_log() self.print_and_log("=" * 67) if self.top_n: self.print_and_log(f"Request-Analyse für {self.domain} (letzte {self.hours} Stunden, Top {self.top_n})") else: self.print_and_log(f"Request-Analyse für {self.domain} (letzte {self.hours} Stunden, ALLE Ergebnisse)") self.print_and_log("=" * 67) self.print_and_log() self.print_and_log(f"Analysiere Zeitraum: {self.cutoff_time.strftime('%d.%m.%Y %H:%M')} bis {datetime.now().strftime('%d.%m.%Y %H:%M')}") self.print_and_log() # 1. Lade alle Log-Dateien self.print_and_log("Extrahiere Daten aus Logs...") self.print_and_log("-" * 67) log_files = [] log_dir = Path(self.log_dir) # Normale Logs for pattern in ['access_log', 'access_ssl_log', 'proxy_access_log', 'proxy_access_ssl_log']: if (log_dir / pattern).exists(): log_files.append(log_dir / pattern) # Gezippte Logs for pattern in ['access_log.processed*.gz', 'access_ssl_log.processed*.gz']: log_files.extend(log_dir.glob(pattern)) # Verarbeite Logs parallel all_requests = [] with ProcessPoolExecutor(max_workers=self.max_workers) as executor: futures = {executor.submit(self.process_log_file, f): f for f in log_files} for future in as_completed(futures): filepath = futures[future] try: requests = future.result() count = len(requests) all_requests.extend(requests) self.print_and_log(f" Verarbeite {filepath.name}... {count} Einträge") except Exception as e: self.print_and_log(f" Fehler bei {filepath.name}: {e}") self.all_requests = all_requests total = len(self.all_requests) self.print_and_log() self.print_and_log("-" * 67) self.print_and_log("1. GESAMTZAHL DER REQUESTS") self.print_and_log("-" * 67) self.print_and_log() self.print_and_log(f"GESAMT: {total} Requests in den letzten {self.hours} Stunden") self.print_and_log(f"Durchschnitt: {total // self.hours} Requests/Stunde") self.print_and_log() # 2. Top URLs self.analyze_urls() # 3. Top IPs self.analyze_ips() # 4. User-Agents self.analyze_user_agents() # 5. Status Codes self.analyze_status_codes() # 6. Request Methods self.analyze_methods() # 7. Bots self.analyze_bots() # 8. Zeitliche Verteilung self.analyze_hourly_distribution() # 9. Verdächtige Aktivitäten self.analyze_suspicious_activity() # 10. 404-Fehler IPs self.analyze_404_ips() # 11. IP-Kategorisierung self.analyze_ip_categories() # 12. Request-Rate-Analyse self.analyze_request_rates() # 13. Bot-Pattern-Analyse self.analyze_bot_patterns() self.print_and_log() self.print_and_log("=" * 67) self.print_and_log("Analyse abgeschlossen") self.print_and_log("=" * 67) self.print_and_log() self.print_and_log(f"Ausgabe wurde gespeichert in:") self.print_and_log(f" {self.output_file}") def analyze_urls(self): """Analysiert die häufigsten URLs""" self.print_and_log("-" * 67) if self.top_n: self.print_and_log(f"2. TOP {self.top_n} ANGEFRAGTE URLs/PFADE") else: self.print_and_log("2. ALLE ANGEFRAGTEN URLs/PFADE (sortiert nach Häufigkeit)") self.print_and_log("-" * 67) self.print_and_log() # Zähle URLs und deren Top-IPs url_counts = Counter() url_ips = defaultdict(Counter) for req in self.all_requests: url = req['url'] ip = req['ip'] url_counts[url] += 1 url_ips[url][ip] += 1 # Sortiere und limitiere top_urls = url_counts.most_common(self.top_n) # Hole IP-Infos für Top-IPs all_top_ips = set() for url, _ in top_urls[:50]: # Nur für die ersten 50 URLs if url in url_ips: top_ip = url_ips[url].most_common(1)[0][0] all_top_ips.add(top_ip) ip_infos = self.get_ip_info_batch(list(all_top_ips)) # Zeige Ergebnisse for url, count in top_urls: if url in url_ips: top_ip, top_count = url_ips[url].most_common(1)[0] ip_info = ip_infos.get(top_ip, self.get_ip_info(top_ip)) self.print_and_log(f"{count:10d} {url[:80]:<80} (Top: {top_ip} x{top_count} - {ip_info})") else: self.print_and_log(f"{count:10d} {url}") self.print_and_log() def analyze_ips(self): """Analysiert die häufigsten IPs""" self.print_and_log("-" * 67) if self.top_n: self.print_and_log(f"3. TOP {self.top_n} IP-ADRESSEN (potenzielle Bots)") else: self.print_and_log("3. ALLE IP-ADRESSEN (sortiert nach Häufigkeit)") self.print_and_log("-" * 67) self.print_and_log() # Zähle IPs ip_counts = Counter(req['ip'] for req in self.all_requests) top_ips = ip_counts.most_common(self.top_n) # Hole IP-Infos parallel self.print_and_log(f"Sammle IP-Informationen für {len(top_ips)} IPs...") self.print_and_log(f"(Parallel-Modus mit {self.max_workers} Cores)") self.print_and_log() ip_list = [ip for ip, _ in top_ips] ip_infos = self.get_ip_info_batch(ip_list) # Zeige Ergebnisse for ip, count in top_ips: info = ip_infos.get(ip, "Lookup fehlgeschlagen") self.print_and_log(f"{count:10d} {ip:<15} ({info})") self.print_and_log() def analyze_user_agents(self): """Analysiert User-Agents""" self.print_and_log("-" * 67) if self.top_n: self.print_and_log(f"4. USER-AGENTS (Top {self.top_n})") else: self.print_and_log("4. ALLE USER-AGENTS (sortiert nach Häufigkeit)") self.print_and_log("-" * 67) self.print_and_log() # Zähle User-Agents und deren Top-IPs ua_counts = Counter() ua_ips = defaultdict(Counter) for req in self.all_requests: ua = req['user_agent'] ip = req['ip'] ua_counts[ua] += 1 ua_ips[ua][ip] += 1 top_uas = ua_counts.most_common(self.top_n) # Hole IP-Infos für Top-IPs all_top_ips = set() for ua, _ in top_uas[:30]: # Nur für die ersten 30 User-Agents if ua in ua_ips: top_ip = ua_ips[ua].most_common(1)[0][0] all_top_ips.add(top_ip) ip_infos = self.get_ip_info_batch(list(all_top_ips)) # Zeige Ergebnisse for ua, count in top_uas: ua_display = ua[:100] + "..." if len(ua) > 100 else ua self.print_and_log(f"{count:10d} {ua_display}") if ua in ua_ips: top_ip, top_count = ua_ips[ua].most_common(1)[0] ip_info = ip_infos.get(top_ip, self.get_ip_info(top_ip)) self.print_and_log(f" (Top-IP: {top_ip} x{top_count} - {ip_info})") self.print_and_log() def analyze_status_codes(self): """Analysiert HTTP Status Codes""" self.print_and_log("-" * 67) self.print_and_log("5. HTTP-STATUS-CODES") self.print_and_log("-" * 67) self.print_and_log() status_counts = Counter(req['status'] for req in self.all_requests) for status, count in status_counts.most_common(): self.print_and_log(f"{count:10d} HTTP {status}") self.print_and_log() def analyze_methods(self): """Analysiert Request-Methoden""" self.print_and_log("-" * 67) self.print_and_log("6. REQUESTS NACH METHODE") self.print_and_log("-" * 67) self.print_and_log() method_counts = Counter(req['method'] for req in self.all_requests if req['method']) for method, count in method_counts.most_common(): self.print_and_log(f"{count:10d} {method}") self.print_and_log() def analyze_bots(self): """Analysiert Bot-Traffic""" self.print_and_log("-" * 67) if self.top_n: self.print_and_log(f"7. TOP {self.top_n} BOTS (identifiziert via User-Agent)") else: self.print_and_log("7. ALLE BOTS (identifiziert via User-Agent)") self.print_and_log("-" * 67) self.print_and_log() bot_counts = Counter() for req in self.all_requests: if self.is_bot_user_agent(req['user_agent']): bot_counts[req['user_agent']] += 1 top_bots = bot_counts.most_common(self.top_n) for bot, count in top_bots: bot_display = bot[:80] + "..." if len(bot) > 80 else bot self.print_and_log(f"{count:10d} {bot_display}") self.print_and_log() def analyze_hourly_distribution(self): """Analysiert zeitliche Verteilung""" self.print_and_log("-" * 67) self.print_and_log("8. REQUESTS PRO STUNDE (zeitliche Verteilung)") self.print_and_log("-" * 67) self.print_and_log() hourly_counts = Counter() for req in self.all_requests: if req['timestamp']: hour_key = req['timestamp'].strftime("%Y-%m-%d %H:00") hourly_counts[hour_key] += 1 # Zeige die letzten 48 Stunden for hour, count in sorted(hourly_counts.items())[-48:]: self.print_and_log(f"{hour} {count:10d} Requests") self.print_and_log() def analyze_suspicious_activity(self): """Analysiert verdächtige Aktivitäten""" self.print_and_log("-" * 67) self.print_and_log("9. VERDÄCHTIGE AKTIVITÄTEN") self.print_and_log("-" * 67) self.print_and_log() # 404-Fehler URLs self.print_and_log("404-Fehler (häufigste nicht existierende Pfade):") error_404_urls = Counter() for req in self.all_requests: if req['status'] == '404': error_404_urls[req['url']] += 1 top_404s = error_404_urls.most_common(self.top_n if self.top_n else 20) for url, count in top_404s: self.print_and_log(f"{count:10d} {url}") # POST-Requests self.print_and_log() self.print_and_log("Häufige POST-Requests mit IPs (potenzielle Brute-Force):") post_ips = defaultdict(Counter) for req in self.all_requests: if req['method'] == 'POST': post_ips[req['url']][req['ip']] += 1 # Aggregiere POST-Requests post_counts = [] for url, ip_counter in post_ips.items(): for ip, count in ip_counter.items(): post_counts.append((count, ip, url)) post_counts.sort(reverse=True) top_posts = post_counts[:self.top_n] if self.top_n else post_counts[:20] if top_posts: # Hole IP-Infos post_ips_list = list(set(ip for _, ip, _ in top_posts)) ip_infos = self.get_ip_info_batch(post_ips_list) for count, ip, url in top_posts: info = ip_infos.get(ip, "Lookup fehlgeschlagen") self.print_and_log(f"{count:10d} {ip:<15} → {url}") self.print_and_log(f" ({info})") self.print_and_log() def analyze_404_ips(self): """Analysiert IPs mit vielen 404-Fehlern""" self.print_and_log("-" * 67) if self.top_n: self.print_and_log(f"10. TOP {self.top_n} IP-ADRESSEN MIT MEISTEN 404-FEHLERN") else: self.print_and_log("10. ALLE IP-ADRESSEN MIT 404-FEHLERN (sortiert nach Häufigkeit)") self.print_and_log("-" * 67) self.print_and_log() error_404_ips = Counter() for req in self.all_requests: if req['status'] == '404': error_404_ips[req['ip']] += 1 top_404_ips = error_404_ips.most_common(self.top_n) if top_404_ips: # Hole IP-Infos ip_list = [ip for ip, _ in top_404_ips] ip_infos = self.get_ip_info_batch(ip_list) for ip, count in top_404_ips: info = ip_infos.get(ip, "Lookup fehlgeschlagen") self.print_and_log(f"{count:10d} {ip:<15} ({info})") self.print_and_log() def analyze_ip_categories(self): """Kategorisiert IPs nach Typ""" self.print_and_log("-" * 67) self.print_and_log("11. IP-KATEGORISIERUNG NACH TYP (Top 20 je Kategorie)") self.print_and_log("-" * 67) self.print_and_log() self.print_and_log("Analysiere IP-Typen...") # Sammle alle unique IPs ip_counts = Counter(req['ip'] for req in self.all_requests) unique_ips = list(ip_counts.keys()) self.print_and_log(f"Führe Parallel-Lookups für {len(unique_ips)} unique IPs durch (mit {self.max_workers} Cores)...") # Hole IP-Infos ip_infos = self.get_ip_info_batch(unique_ips) # Kategorisiere IPs categories = defaultdict(list) for ip, count in ip_counts.items(): info = ip_infos.get(ip, "Unknown") category = self.categorize_ip(info) categories[category].append((count, ip, info)) # Zeige Kategorien category_names = { 'cloud': 'CLOUD-PROVIDER (AWS, Azure, GCP, etc.)', 'datacenter': 'RECHENZENTREN / DATACENTER', 'hosting': 'HOSTING-PROVIDER', 'vpn': 'VPN / PROXY-DIENSTE', 'tor': 'TOR-NETZWERK', 'residential': 'PRIVAT-NUTZER / ISP (Telekom, Vodafone, etc.)', 'isp': 'SONSTIGE ISP' } for cat_key, cat_name in category_names.items(): self.print_and_log() self.print_and_log(f"{cat_name}:") if cat_key in categories: sorted_ips = sorted(categories[cat_key], reverse=True)[:20] for count, ip, info in sorted_ips: self.print_and_log(f"{count:10d} {ip:<15} ({info})") else: self.print_and_log(" Keine gefunden") self.print_and_log() def analyze_request_rates(self): """Analysiert Request-Raten um Burst-Patterns zu erkennen""" self.print_and_log("-" * 67) self.print_and_log("12. REQUEST-RATE-ANALYSE (für Rate-Limiting)") self.print_and_log("-" * 67) self.print_and_log() self.print_and_log("Analysiere Request-Raten pro IP (Requests/Minute)...") self.print_and_log("Hilft bei der Entscheidung für angemessene Rate-Limits") self.print_and_log() # Sammle IPs mit mindestens 100 Requests ip_counts = Counter(req['ip'] for req in self.all_requests) relevant_ips = [(count, ip) for ip, count in ip_counts.items() if count >= 100] relevant_ips.sort(reverse=True) # Analysiere Top-IPs rate_analysis = [] for count, ip in relevant_ips[:50]: # Top 50 IPs analysieren max_rate, avg_rate, burst_count = self.calculate_request_rate(ip) if max_rate > 0: rate_analysis.append((max_rate, avg_rate, count, ip, burst_count)) # Sortiere nach maximaler Rate rate_analysis.sort(reverse=True) if rate_analysis: self.print_and_log("=" * 67) self.print_and_log("TOP IPS NACH MAXIMALER REQUEST-RATE") self.print_and_log("=" * 67) self.print_and_log() self.print_and_log(" IP | Max/Min | Avg/Min | Total | Bursts | Info") self.print_and_log(" " + "-" * 63) # Hole IP-Infos ip_list = [ip for _, _, _, ip, _ in rate_analysis[:20]] ip_infos = self.get_ip_info_batch(ip_list) for max_rate, avg_rate, total, ip, burst_count in rate_analysis[:20]: info = ip_infos.get(ip, "Unknown") info_short = info[:40] + "..." if len(info) > 40 else info # Warnung-Symbole basierend auf Rate warning = "" if max_rate >= self.extreme_rate_threshold: warning = "🔴" # Extrem hoch (über konfiguriertem Schwellwert) elif max_rate >= self.extreme_rate_threshold * 0.5: warning = "🟡" # Hoch (50% des Schwellwerts) elif max_rate >= self.extreme_rate_threshold * 0.25: warning = "⚠️" # Erhöht (25% des Schwellwerts) self.print_and_log(f" {warning:<2} {ip:<15} | {max_rate:7d} | {avg_rate:7.1f} | {total:5d} | {burst_count:6d} | {info_short}") # Statistiken self.print_and_log() self.print_and_log("=" * 67) self.print_and_log("RATE-LIMITING EMPFEHLUNGEN") self.print_and_log("=" * 67) self.print_and_log() # Berechne Perzentile all_max_rates = [r[0] for r in rate_analysis] if all_max_rates: percentile_50 = sorted(all_max_rates)[len(all_max_rates)//2] percentile_90 = sorted(all_max_rates)[int(len(all_max_rates)*0.9) if int(len(all_max_rates)*0.9) > 0 else 0] percentile_99 = sorted(all_max_rates)[int(len(all_max_rates)*0.99) if int(len(all_max_rates)*0.99) > 0 else -1] self.print_and_log("📊 Request-Rate Verteilung:") self.print_and_log(f" 50% der IPs: <= {percentile_50} Requests/Minute") self.print_and_log(f" 90% der IPs: <= {percentile_90} Requests/Minute") self.print_and_log(f" 99% der IPs: <= {percentile_99} Requests/Minute") self.print_and_log() # Empfehlungen self.print_and_log("💡 Empfohlene Rate-Limits basierend auf Analyse:") self.print_and_log() self.print_and_log(f" Konfigurierter Extreme-Schwellwert: {self.extreme_rate_threshold} Requests/Minute") self.print_and_log() if percentile_90 < 10: self.print_and_log(" ✅ NORMAL TRAFFIC: Die meisten IPs haben niedrige Raten") self.print_and_log(" - Standard-Limit: 20-30 Requests/Minute") self.print_and_log(" - Burst-Limit: 5-10 Requests/10 Sekunden") elif percentile_90 < 30: self.print_and_log(" ⚠️ MODERATE TRAFFIC: Einige IPs zeigen erhöhte Aktivität") self.print_and_log(" - Standard-Limit: 30-60 Requests/Minute") self.print_and_log(" - Burst-Limit: 10-15 Requests/10 Sekunden") else: self.print_and_log(" 🔴 HIGH TRAFFIC: Viele IPs mit hohen Request-Raten") self.print_and_log(" - Standard-Limit: 60-120 Requests/Minute") self.print_and_log(" - Burst-Limit: 20-30 Requests/10 Sekunden") self.print_and_log() self.print_and_log(" Zusätzliche Überlegungen:") self.print_and_log(" - Residential IPs: Großzügigere Limits") self.print_and_log(" - Cloud/Datacenter: Strengere Limits") self.print_and_log(" - Bekannte Bots: Sehr strenge Limits oder Block") # Zeige IPs die definitiv geblockt werden sollten extreme_ips = [ip for rate, _, _, ip, _ in rate_analysis if rate > self.extreme_rate_threshold] if extreme_ips: self.print_and_log() self.print_and_log(f" 🔴 IPs mit extremen Raten (>{self.extreme_rate_threshold}/min) - SOFORT BLOCKEN:") for ip in extreme_ips[:10]: info = ip_infos.get(ip, "Unknown") self.print_and_log(f" - {ip}: {info[:50]}") else: self.print_and_log(" Keine IPs mit genügend Daten für Rate-Analyse gefunden") self.print_and_log() def calculate_request_rate(self, ip): """Berechnet die maximale Request-Rate einer IP (Requests pro Minute)""" # Sammle alle Timestamps für diese IP timestamps = [] for req in self.all_requests: if req['ip'] == ip and req['timestamp']: timestamps.append(req['timestamp']) if len(timestamps) < 2: return 0, 0, 0 # max_rate, avg_rate, burst_count timestamps.sort() # Analysiere Requests in 60-Sekunden-Fenstern max_requests_per_minute = 0 total_minutes = 0 burst_count = 0 # Anzahl der Minuten mit > 10 Requests # Sliding window von 60 Sekunden for i in range(len(timestamps)): window_end = timestamps[i] + timedelta(seconds=60) requests_in_window = 0 for j in range(i, len(timestamps)): if timestamps[j] <= window_end: requests_in_window += 1 else: break if requests_in_window > max_requests_per_minute: max_requests_per_minute = requests_in_window if requests_in_window > 10: burst_count += 1 # Durchschnittliche Rate total_duration = (timestamps[-1] - timestamps[0]).total_seconds() if total_duration > 0: avg_rate = (len(timestamps) * 60) / total_duration else: avg_rate = 0 return max_requests_per_minute, avg_rate, burst_count def get_top_urls_for_ip(self, ip, limit=3): """Holt die Top-URLs für eine spezifische IP""" url_counts = Counter() for req in self.all_requests: if req['ip'] == ip: url_counts[req['url']] += 1 return url_counts.most_common(limit) def analyze_bot_patterns(self): """Erweiterte Bot-Pattern-Analyse""" self.print_and_log("-" * 67) self.print_and_log("13. BOT-PATTERN-ANALYSE & ENTSCHEIDUNGSHILFE") self.print_and_log("-" * 67) self.print_and_log() if len(self.all_requests) < 10: self.print_and_log("⚠️ WARNUNG: Zu wenig Daten für erweiterte Analyse (< 10 Requests)") self.print_and_log(" Überspringe Bot-Pattern-Analyse...") return self.print_and_log("Analysiere Bot-Verhaltensmuster für fundierte Block-Entscheidungen...") self.print_and_log() # 1. IPs mit hoher Request-Rate aber wenig URL-Varianz self.print_and_log("=" * 67) self.print_and_log("VERDÄCHTIGE IPs: Hohe Request-Rate + geringe URL-Vielfalt") self.print_and_log("=" * 67) self.print_and_log("(Echte User besuchen viele verschiedene Seiten, Bots oft nur wenige)") self.print_and_log() ip_stats = defaultdict(lambda: {'count': 0, 'urls': set()}) for req in self.all_requests: ip_stats[req['ip']]['count'] += 1 ip_stats[req['ip']]['urls'].add(req['url']) low_variety_ips = [] for ip, stats in ip_stats.items(): if stats['count'] > 100: unique_urls = len(stats['urls']) ratio = (unique_urls * 100) // stats['count'] if ratio < 5: low_variety_ips.append((stats['count'], ip, unique_urls, ratio, stats['urls'])) low_variety_ips.sort(reverse=True) if low_variety_ips: # Hole IP-Infos ip_list = [ip for _, ip, _, _, _ in low_variety_ips[:20]] ip_infos = self.get_ip_info_batch(ip_list) for count, ip, unique_urls, ratio, urls in low_variety_ips[:20]: info = ip_infos.get(ip, "Unknown") self.print_and_log(f"⚠️ {ip}: {count} requests, nur {unique_urls} unique URLs ({ratio}% Vielfalt)") self.print_and_log(f" {info}") self.print_and_log(" Top-URLs:") # Zeige Top-3 URLs url_counts = Counter(req['url'] for req in self.all_requests if req['ip'] == ip) for url, url_count in url_counts.most_common(3): self.print_and_log(f" {url_count:6d} x {url}") self.print_and_log() else: self.print_and_log(" Keine verdächtigen IPs gefunden") self.print_and_log() # 2. User-Agent + IP Kombinationen self.print_and_log("=" * 67) self.print_and_log("USER-AGENT + IP KOMBINATIONEN (Bot-Fingerprinting)") self.print_and_log("=" * 67) self.print_and_log("(Gleicher User-Agent von vielen IPs = verteilter Bot)") self.print_and_log() ua_stats = defaultdict(lambda: {'count': 0, 'ips': set()}) for req in self.all_requests: ua_stats[req['user_agent']]['count'] += 1 ua_stats[req['user_agent']]['ips'].add(req['ip']) distributed_bots = [] for ua, stats in ua_stats.items(): unique_ips = len(stats['ips']) if unique_ips > 20 and stats['count'] > 1000: distributed_bots.append((stats['count'], unique_ips, ua, stats['ips'])) distributed_bots.sort(reverse=True) if distributed_bots: for total_requests, unique_ips, ua, ips in distributed_bots[:30]: ua_short = ua[:80] + "..." if len(ua) > 80 else ua avg_per_ip = total_requests / unique_ips self.print_and_log(f"🤖 {ua_short}") self.print_and_log(f" {total_requests} requests von {unique_ips} verschiedenen IPs (Ø {avg_per_ip:.1f} req/IP)") # Zeige Top-3 IPs self.print_and_log(" Top-IPs:") ip_counts = Counter(req['ip'] for req in self.all_requests if req['user_agent'] == ua) top_ips = ip_counts.most_common(3) if top_ips: ip_list = [ip for ip, _ in top_ips] ip_infos = self.get_ip_info_batch(ip_list) for ip_addr, ip_count in top_ips: ip_info = ip_infos.get(ip_addr, "Unknown")[:60] self.print_and_log(f" {ip_count:6d} x {ip_addr} ({ip_info})") self.print_and_log() else: self.print_and_log(" Keine verteilten Bots gefunden") self.print_and_log() # 3. Scanner-Aktivität (404-Pattern) self.print_and_log("=" * 67) self.print_and_log("SCANNER-AKTIVITÄT: 404-Fehler Pattern") self.print_and_log("=" * 67) self.print_and_log("(Viele 404-Fehler = Scanner suchen nach Schwachstellen)") self.print_and_log() ip_404_stats = defaultdict(lambda: {'total': 0, 'errors': 0}) for req in self.all_requests: ip_404_stats[req['ip']]['total'] += 1 if req['status'] == '404': ip_404_stats[req['ip']]['errors'] += 1 high_404_ips = [] for ip, stats in ip_404_stats.items(): if stats['total'] > 50 and stats['errors'] > 0: error_rate = (stats['errors'] * 100) // stats['total'] if error_rate > 30: high_404_ips.append((stats['total'], stats['errors'], error_rate, ip)) high_404_ips.sort(key=lambda x: x[2], reverse=True) if high_404_ips: # Hole IP-Infos ip_list = [ip for _, _, _, ip in high_404_ips[:15]] ip_infos = self.get_ip_info_batch(ip_list) for total, errors, error_rate, ip in high_404_ips[:15]: info = ip_infos.get(ip, "Unknown") self.print_and_log(f"🔍 {ip}: {errors}/{total} requests sind 404-Fehler ({error_rate}%)") self.print_and_log(f" {info}") # Zeige Top-3 404-URLs self.print_and_log(" Gesuchte Pfade:") error_urls = Counter(req['url'] for req in self.all_requests if req['ip'] == ip and req['status'] == '404') for url, count in error_urls.most_common(3): self.print_and_log(f" {count:6d} x {url}") self.print_and_log() else: self.print_and_log(" Keine IPs mit hoher 404-Rate gefunden") self.print_and_log() # 4. Empfohlene Block-Liste self.print_and_log("=" * 67) self.print_and_log("EMPFOHLENE BLOCK-LISTE (basierend auf Analyse)") self.print_and_log("=" * 67) self.print_and_log() self.print_and_log("📋 IPs zum sofortigen Blockieren (hohe Konfidenz):") self.print_and_log(" (Datacenter + hohe Request-Rate + geringe Vielfalt ODER hohe 404-Rate)") self.print_and_log() # Kombiniere Kandidaten block_candidates = [] # Low variety IPs for count, ip, unique_urls, ratio, _ in low_variety_ips[:30]: block_candidates.append((count, ip, 'low_variety', ratio)) # High 404 IPs for total, errors, error_rate, ip in high_404_ips[:30]: block_candidates.append((total, ip, 'high_404', error_rate)) # Dedupliziere und sortiere seen_ips = set() final_candidates = [] for count, ip, reason, metric in sorted(block_candidates, reverse=True): if ip not in seen_ips: seen_ips.add(ip) final_candidates.append((count, ip, reason, metric)) if final_candidates: # Hole IP-Infos ip_list = [ip for _, ip, _, _ in final_candidates[:30]] ip_infos = self.get_ip_info_batch(ip_list) for count, ip, reason, metric in final_candidates[:30]: info = ip_infos.get(ip, "Unknown") # Prüfe ob Datacenter/Cloud ip_type = "" if any(x in info.lower() for x in ['amazon', 'aws', 'azure', 'google cloud', 'digitalocean', 'datacenter', 'hosting', 'hetzner']): ip_type = "[DATACENTER/CLOUD]" reason_text = "" if reason == 'low_variety': reason_text = f"Geringe URL-Vielfalt ({metric}%)" elif reason == 'high_404': reason_text = f"Hohe 404-Rate ({metric}%)" # Berechne Request-Rate max_rate, avg_rate, burst_count = self.calculate_request_rate(ip) rate_info = f"Max: {max_rate}/min, Avg: {avg_rate:.1f}/min" self.print_and_log(f" {ip:<15} {ip_type:<20} - {reason_text}") self.print_and_log(f" {count} requests | {info}") self.print_and_log(f" Request-Rate: {rate_info}") # Zeige Top-URLs für diese IP top_urls = self.get_top_urls_for_ip(ip, 3) if top_urls: self.print_and_log(" Top angefragte URLs:") for url, url_count in top_urls: url_display = url[:60] + "..." if len(url) > 60 else url self.print_and_log(f" {url_count:6d}x {url_display}") # Warnung bei sehr hohen Raten if max_rate > self.extreme_rate_threshold: self.print_and_log(f" ⚠️ WARNUNG: Sehr hohe Request-Rate! ({max_rate} Requests/Minute)") elif max_rate > self.extreme_rate_threshold * 0.75: self.print_and_log(f" ⚠️ Erhöhte Request-Rate: {max_rate} Requests/Minute") self.print_and_log() else: self.print_and_log(" Keine verdächtigen IPs für Block-Liste gefunden") self.print_and_log() self.print_and_log("💡 HINWEIS: Überprüfe diese IPs manuell bevor du sie blockierst!") self.print_and_log(" - Residential IPs (Telekom, Vodafone) könnten echte User sein") self.print_and_log(" - Cloud/Datacenter IPs sind meist Bots") def main(): """Hauptfunktion mit interaktivem Modus""" # Wenn keine Argumente angegeben, starte interaktiven Modus if len(sys.argv) == 1: # Sammle alle Interaktions-Logs all_interaction_logs = [] all_interaction_logs.append("\n" + "="*67) all_interaction_logs.append(" JTL-SHOP LOG-ANALYSE TOOL ".center(67)) all_interaction_logs.append("="*67) all_interaction_logs.append("\nInteraktiver Modus - Keine Parameter angegeben") all_interaction_logs.append("Starte Schritt-für-Schritt Konfiguration...") print("\n" + "="*67) print(" JTL-SHOP LOG-ANALYSE TOOL ".center(67)) print("="*67) print("\nInteraktiver Modus - Keine Parameter angegeben") print("Starte Schritt-für-Schritt Konfiguration...") # 1. Domain auswählen domains = discover_domains() selected_domain, domain_log = select_domain_interactive(domains) all_interaction_logs.extend(domain_log) if not selected_domain: sys.exit(0) # 2. Zeitspanne wählen hours, timespan_log = get_timespan_interactive() all_interaction_logs.extend(timespan_log) # 3. Top-N wählen top_n, topn_log = get_top_n_interactive() all_interaction_logs.extend(topn_log) # 4. Extreme Rate Schwellwert wählen extreme_threshold, threshold_log = get_extreme_rate_threshold_interactive() all_interaction_logs.extend(threshold_log) # Zusammenfassung summary_lines = [ "\n" + "="*67, "ANALYSE-KONFIGURATION", "="*67, f" Domain: {selected_domain}", f" Zeitspanne: {hours} Stunden", ] if top_n: summary_lines.append(f" Ergebnisse: Top {top_n}") else: summary_lines.append(f" Ergebnisse: ALLE") summary_lines.append(f" Extreme Rate: {extreme_threshold} Requests/Minute (~{extreme_threshold/60:.1f}/Sekunde)") summary_lines.append("="*67) for line in summary_lines: print(line) all_interaction_logs.append(line) confirm = input("\n📌 Analyse starten? [Y/n]: ").strip().lower() all_interaction_logs.append(f"\n📌 Analyse starten? [Y/n]: {confirm}") if confirm and confirm not in ['y', 'yes', 'j', 'ja']: msg = "Abbruch." print(msg) all_interaction_logs.append(msg) sys.exit(0) msg = "\n🚀 Starte Analyse...\n" print(msg) all_interaction_logs.append(msg) # Starte Analyse mit gewählten Parametern und Interaction-Log analyzer = LogAnalyzer(hours=hours, top_n=top_n, domain=selected_domain, extreme_rate_threshold=extreme_threshold, interactive_log=all_interaction_logs) try: analyzer.analyze() except KeyboardInterrupt: print("\n\nAnalyse abgebrochen durch Benutzer.") sys.exit(0) except Exception as e: print(f"\nFehler während der Analyse: {e}") import traceback traceback.print_exc() sys.exit(1) else: # Klassischer Modus mit Kommandozeilen-Argumenten parser = argparse.ArgumentParser( description='Request-Analyse für JTL-Shop mit Bot-Erkennung', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Verwendung: INTERAKTIVER MODUS (empfohlen): %(prog)s # Startet interaktive Shop-Auswahl und Konfiguration KOMMANDOZEILEN-MODUS: %(prog)s [top_n] %(prog)s [top_n] # Nutzt Standard-Domain Beispiele: %(prog)s # Interaktiver Modus %(prog)s taschengelddieb.de 24h # Spezifische Domain, 24h, alle Ergebnisse %(prog)s 24h # Standard-Domain, 24h, alle Ergebnisse %(prog)s 12h 50 # Standard-Domain, 12h, Top 50 %(prog)s shop.de 72h 100 # Spezifische Domain, 72h, Top 100 """ ) # Flexibles Parsing der Argumente parser.add_argument('arg1', nargs='?', help='Domain oder Zeitspanne') parser.add_argument('arg2', nargs='?', help='Zeitspanne oder Top-N') parser.add_argument('arg3', nargs='?', help='Top-N oder Extreme-Rate') parser.add_argument('arg4', nargs='?', help='Extreme-Rate (optional)') parser.add_argument('--extreme-rate', type=int, default=60, help='Extreme Rate Schwellwert in Requests/Minute (Standard: 60)') args = parser.parse_args() # Intelligente Argument-Interpretation domain = None timespan = None top_n = None extreme_rate = args.extreme_rate # Nutze den --extreme-rate flag als Default # Prüfe ob arg1 eine Domain ist (enthält Punkt) oder Zeitspanne if args.arg1: if '.' in args.arg1: # Es ist eine Domain domain = args.arg1 timespan = args.arg2 if args.arg3: try: top_n = int(args.arg3) except (ValueError, TypeError): pass elif args.arg2 and not any(c in args.arg2 for c in ['h', 'H']): # arg2 könnte top_n sein try: top_n = int(args.arg2) timespan = None except (ValueError, TypeError): pass else: # Es ist eine Zeitspanne timespan = args.arg1 if args.arg2: try: top_n = int(args.arg2) except (ValueError, TypeError): # Vielleicht ist es eine Domain if '.' in args.arg2: domain = args.arg2 # Falls keine Domain angegeben, versuche Standard oder zeige Auswahl if not domain: domains = discover_domains() if len(domains) == 1: domain = domains[0] print(f"Verwende einzige verfügbare Domain: {domain}") elif 'taschengelddieb.de' in domains: domain = 'taschengelddieb.de' print(f"Verwende Standard-Domain: {domain}") else: # Interaktive Auswahl domain, _ = select_domain_interactive(domains) if not domain: sys.exit(0) # Parse Zeitspanne if not timespan: hours = 48 print(f"Keine Zeitspanne angegeben, nutze Standard: {hours}h") else: timespan = timespan.rstrip('hH') try: hours = int(timespan) except ValueError: print(f"Fehler: Ungültige Zeitspanne '{timespan}'") print("Nutze Format: h oder ") sys.exit(1) # Info ausgeben if top_n: print(f"Starte Analyse für die letzten {hours} Stunden (Top {top_n})...") else: print(f"Starte Analyse für die letzten {hours} Stunden (ALLE Ergebnisse)...") print(f"Domain: {domain}") print(f"Extreme Rate Schwellwert: {extreme_rate} Requests/Minute") print(f"CPU-Cores: {multiprocessing.cpu_count()}") print() # Starte Analyse analyzer = LogAnalyzer(hours=hours, top_n=top_n, domain=domain, extreme_rate_threshold=extreme_rate) try: analyzer.analyze() except KeyboardInterrupt: print("\n\nAnalyse abgebrochen durch Benutzer.") sys.exit(0) except Exception as e: print(f"\nFehler während der Analyse: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()