Files
shop-request-deep-analyse/requestanalyse.py

1570 lines
63 KiB
Python

#!/usr/bin/env python3
"""
Request-Analyse für JTL-Shop Domains - Python Version
Analysiert Apache/Nginx Logs mit detaillierter Bot-Erkennung und IP-Klassifizierung
"""
import sys
import os
import re
import gzip
import subprocess
import socket
import ipaddress
from datetime import datetime, timedelta
from collections import defaultdict, Counter
from pathlib import Path
import argparse
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
import functools
import signal
import tempfile
import json
# Basis-Pfad für vhosts
VHOSTS_BASE = "/var/www/vhosts"
# Cache für IP-Lookups (wird zwischen Threads geteilt)
ip_info_cache = {}
# Monatsnamen-Mapping für Apache-Logs
MONTH_NAMES = {
'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4,
'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8,
'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
}
def discover_domains():
"""Entdeckt alle verfügbaren Domains/Shops im vhosts Verzeichnis"""
domains = []
vhosts_path = Path(VHOSTS_BASE)
if not vhosts_path.exists():
return domains
# Durchsuche alle Verzeichnisse in vhosts
for domain_dir in vhosts_path.iterdir():
if domain_dir.is_dir():
# Prüfe ob ein logs-Verzeichnis existiert
logs_dir = domain_dir / "logs"
if logs_dir.exists() and logs_dir.is_dir():
# Prüfe ob Log-Dateien vorhanden sind
log_files = list(logs_dir.glob("access*log*"))
if log_files:
domain_name = domain_dir.name
# Filtere System-Verzeichnisse
if not domain_name.startswith('.') and domain_name not in ['default', 'system']:
domains.append(domain_name)
return sorted(domains)
def select_domain_interactive(domains):
"""Interaktive Domain-Auswahl mit Logging"""
interaction_log = []
interaction_log.append("\n" + "="*67)
interaction_log.append("VERFÜGBARE SHOPS/DOMAINS")
interaction_log.append("="*67)
interaction_log.append("")
print("\n" + "="*67)
print("VERFÜGBARE SHOPS/DOMAINS")
print("="*67)
print()
if not domains:
msg = f"❌ Keine Domains mit Log-Dateien gefunden in {VHOSTS_BASE}"
print(msg)
interaction_log.append(msg)
return None, interaction_log
# Zeige nummerierte Liste
for i, domain in enumerate(domains, 1):
# Prüfe Anzahl der Log-Dateien für zusätzliche Info
logs_dir = Path(VHOSTS_BASE) / domain / "logs"
log_count = len(list(logs_dir.glob("access*log*")))
line = f" {i:2d}. {domain:<40} ({log_count} Log-Dateien)"
print(line)
interaction_log.append(line)
interaction_log.append("")
interaction_log.append("Eingabemöglichkeiten:")
interaction_log.append(" - Nummer eingeben (z.B. '1' oder '2')")
interaction_log.append(" - Domain-Namen eingeben (z.B. 'taschengelddieb.de')")
interaction_log.append(" - Enter für Abbruch")
interaction_log.append("")
print()
print("Eingabemöglichkeiten:")
print(" - Nummer eingeben (z.B. '1' oder '2')")
print(" - Domain-Namen eingeben (z.B. 'taschengelddieb.de')")
print(" - Enter für Abbruch")
print()
while True:
choice = input("🔍 Welchen Shop analysieren? ").strip()
interaction_log.append(f"🔍 Welchen Shop analysieren? {choice}")
if not choice:
msg = "Abbruch."
print(msg)
interaction_log.append(msg)
return None, interaction_log
# Prüfe ob Nummer eingegeben wurde
try:
num = int(choice)
if 1 <= num <= len(domains):
selected = domains[num - 1]
msg = f"\n✅ Ausgewählt: {selected}"
print(msg)
interaction_log.append(msg)
return selected, interaction_log
else:
msg = f"❌ Ungültige Nummer. Bitte 1-{len(domains)} eingeben."
print(msg)
interaction_log.append(msg)
except ValueError:
# Prüfe ob Domain-Name eingegeben wurde
if choice in domains:
msg = f"\n✅ Ausgewählt: {choice}"
print(msg)
interaction_log.append(msg)
return choice, interaction_log
# Prüfe ob Teil-Match existiert
matches = [d for d in domains if choice.lower() in d.lower()]
if len(matches) == 1:
msg = f"\n✅ Ausgewählt: {matches[0]}"
print(msg)
interaction_log.append(msg)
return matches[0], interaction_log
elif len(matches) > 1:
msg = f"❌ Mehrere Treffer gefunden: {', '.join(matches)}"
print(msg)
interaction_log.append(msg)
print(" Bitte genauer spezifizieren.")
interaction_log.append(" Bitte genauer spezifizieren.")
else:
msg = f"❌ Domain '{choice}' nicht gefunden."
print(msg)
interaction_log.append(msg)
def get_timespan_interactive():
"""Interaktive Zeitspannen-Eingabe mit Logging"""
interaction_log = []
interaction_log.append("\n" + "="*67)
interaction_log.append("ZEITSPANNE FÜR ANALYSE")
interaction_log.append("="*67)
interaction_log.append("")
interaction_log.append("Beispiele:")
interaction_log.append(" - '6' oder '6h' für die letzten 6 Stunden")
interaction_log.append(" - '24' oder '24h' für die letzten 24 Stunden")
interaction_log.append(" - '72' oder '72h' für die letzten 3 Tage")
interaction_log.append(" - Enter für Standard (48 Stunden)")
interaction_log.append("")
print("\n" + "="*67)
print("ZEITSPANNE FÜR ANALYSE")
print("="*67)
print()
print("Beispiele:")
print(" - '6' oder '6h' für die letzten 6 Stunden")
print(" - '24' oder '24h' für die letzten 24 Stunden")
print(" - '72' oder '72h' für die letzten 3 Tage")
print(" - Enter für Standard (48 Stunden)")
print()
while True:
choice = input("⏰ Zeitspanne in Stunden: ").strip()
interaction_log.append(f"⏰ Zeitspanne in Stunden: {choice}")
if not choice:
msg = "✅ Verwende Standard: 48 Stunden"
print(msg)
interaction_log.append(msg)
return 48, interaction_log
# Entferne 'h' falls vorhanden
choice = choice.rstrip('h')
try:
hours = int(choice)
if hours <= 0:
msg = "❌ Zeitspanne muss größer als 0 sein."
print(msg)
interaction_log.append(msg)
elif hours > 8760: # 1 Jahr
msg = "❌ Zeitspanne zu groß (max. 8760 Stunden = 1 Jahr)"
print(msg)
interaction_log.append(msg)
else:
msg = f"✅ Zeitspanne: {hours} Stunden"
print(msg)
interaction_log.append(msg)
return hours, interaction_log
except ValueError:
msg = "❌ Ungültige Eingabe. Bitte eine Zahl eingeben."
print(msg)
interaction_log.append(msg)
def get_top_n_interactive():
"""Interaktive Top-N Eingabe mit Logging"""
interaction_log = []
interaction_log.append("\n" + "="*67)
interaction_log.append("ANZAHL DER ERGEBNISSE")
interaction_log.append("="*67)
interaction_log.append("")
interaction_log.append("Beispiele:")
interaction_log.append(" - '50' für Top 50 Ergebnisse")
interaction_log.append(" - '100' für Top 100 Ergebnisse")
interaction_log.append(" - '0' oder 'all' für ALLE Ergebnisse")
interaction_log.append(" - Enter für ALLE Ergebnisse")
interaction_log.append("")
print("\n" + "="*67)
print("ANZAHL DER ERGEBNISSE")
print("="*67)
print()
print("Beispiele:")
print(" - '50' für Top 50 Ergebnisse")
print(" - '100' für Top 100 Ergebnisse")
print(" - '0' oder 'all' für ALLE Ergebnisse")
print(" - Enter für ALLE Ergebnisse")
print()
while True:
choice = input("📊 Anzahl der Top-Ergebnisse: ").strip().lower()
interaction_log.append(f"📊 Anzahl der Top-Ergebnisse: {choice}")
if not choice or choice == '0' or choice == 'all' or choice == 'alle':
msg = "✅ Zeige ALLE Ergebnisse"
print(msg)
interaction_log.append(msg)
return None, interaction_log
try:
top_n = int(choice)
if top_n <= 0:
msg = "✅ Zeige ALLE Ergebnisse"
print(msg)
interaction_log.append(msg)
return None, interaction_log
else:
msg = f"✅ Zeige Top {top_n} Ergebnisse"
print(msg)
interaction_log.append(msg)
return top_n, interaction_log
except ValueError:
msg = "❌ Ungültige Eingabe. Bitte eine Zahl eingeben oder 'all' für alle."
print(msg)
interaction_log.append(msg)
def get_extreme_rate_threshold_interactive():
"""Interaktive Eingabe für extreme Request-Rate Schwellwert mit Logging"""
interaction_log = []
interaction_log.append("\n" + "="*67)
interaction_log.append("EXTREME REQUEST-RATE SCHWELLWERT")
interaction_log.append("="*67)
interaction_log.append("")
interaction_log.append("Ab welcher Request-Rate (Requests/Minute) soll eine IP")
interaction_log.append("als EXTREM eingestuft und zum sofortigen Block empfohlen werden?")
interaction_log.append("")
interaction_log.append("Beispiele:")
interaction_log.append(" - '60' = 1 Request pro Sekunde (Standard)")
interaction_log.append(" - '120' = 2 Requests pro Sekunde")
interaction_log.append(" - '250' = ~4 Requests pro Sekunde")
interaction_log.append(" - '600' = 10 Requests pro Sekunde")
interaction_log.append(" - Enter für Standard (60)")
interaction_log.append("")
print("\n" + "="*67)
print("EXTREME REQUEST-RATE SCHWELLWERT")
print("="*67)
print()
print("Ab welcher Request-Rate (Requests/Minute) soll eine IP")
print("als EXTREM eingestuft und zum sofortigen Block empfohlen werden?")
print()
print("Beispiele:")
print(" - '60' = 1 Request pro Sekunde (Standard)")
print(" - '120' = 2 Requests pro Sekunde")
print(" - '250' = ~4 Requests pro Sekunde")
print(" - '600' = 10 Requests pro Sekunde")
print(" - Enter für Standard (60)")
print()
while True:
choice = input("🔴 Extreme Rate Schwellwert (Requests/Minute): ").strip()
interaction_log.append(f"🔴 Extreme Rate Schwellwert (Requests/Minute): {choice}")
if not choice:
msg = "✅ Verwende Standard: 60 Requests/Minute"
print(msg)
interaction_log.append(msg)
return 60, interaction_log
try:
threshold = int(choice)
if threshold <= 0:
msg = "❌ Schwellwert muss größer als 0 sein."
print(msg)
interaction_log.append(msg)
elif threshold > 10000:
msg = "❌ Schwellwert zu hoch (max. 10000)"
print(msg)
interaction_log.append(msg)
else:
reqs_per_sec = threshold / 60
msg = f"✅ Extreme Rate Schwellwert: {threshold} Requests/Minute (~{reqs_per_sec:.1f}/Sekunde)"
print(msg)
interaction_log.append(msg)
return threshold, interaction_log
except ValueError:
msg = "❌ Ungültige Eingabe. Bitte eine Zahl eingeben."
print(msg)
interaction_log.append(msg)
class LogAnalyzer:
def __init__(self, hours=48, top_n=None, domain=None, extreme_rate_threshold=60, interactive_log=None):
if not domain:
raise ValueError("Domain muss angegeben werden!")
self.domain = domain
self.log_dir = f"/var/www/vhosts/{domain}/logs"
self.hours = hours
self.top_n = top_n
self.extreme_rate_threshold = extreme_rate_threshold
self.cutoff_time = datetime.now() - timedelta(hours=hours)
self.interactive_log = interactive_log or [] # Speichert interaktive Auswahl
# Anzahl der CPU-Cores für Parallel-Verarbeitung
self.total_cores = multiprocessing.cpu_count()
self.max_workers = max(1, self.total_cores - 4) # Reserve 4 Cores für System
# Datenstrukturen für Analyse
self.all_requests = []
self.ip_cache = {}
# Erstelle Ausgabedatei
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
script_dir = Path(__file__).parent
if self.top_n:
self.output_file = script_dir / f"shop_analyse_{hours}h_top{top_n}_{timestamp}.txt"
else:
self.output_file = script_dir / f"shop_analyse_{hours}h_ALL_{timestamp}.txt"
def print_and_log(self, message="", end="\n"):
"""Gibt Text sowohl auf Console als auch in Datei aus"""
print(message, end=end)
with open(self.output_file, 'a', encoding='utf-8') as f:
f.write(message + end)
def parse_apache_timestamp(self, timestamp_str):
"""Parst Apache Log Timestamps [DD/Mon/YYYY:HH:MM:SS +ZONE]"""
try:
# Format: [10/Jan/2024:15:30:45 +0100]
match = re.match(r'\[(\d{2})/(\w{3})/(\d{4}):(\d{2}):(\d{2}):(\d{2})', timestamp_str)
if match:
day = int(match.group(1))
month = MONTH_NAMES.get(match.group(2), 1)
year = int(match.group(3))
hour = int(match.group(4))
minute = int(match.group(5))
second = int(match.group(6))
return datetime(year, month, day, hour, minute, second)
except Exception:
pass
return None
def parse_log_line(self, line):
"""Parst eine Apache/Nginx Log-Zeile"""
# Apache Combined Log Format
# IP - - [timestamp] "METHOD URL HTTP/X.X" status size "referer" "user-agent"
pattern = r'^(\S+) \S+ \S+ (\[[^\]]+\]) "([^"]*)" (\d{3}) (\S+) "([^"]*)" "([^"]*)"'
match = re.match(pattern, line)
if match:
ip = match.group(1)
timestamp_str = match.group(2)
request = match.group(3)
status = match.group(4)
size = match.group(5)
referer = match.group(6)
user_agent = match.group(7)
# Parse timestamp
timestamp = self.parse_apache_timestamp(timestamp_str)
# Parse request
request_match = re.match(r'^(\w+) (\S+)', request)
method = request_match.group(1) if request_match else ""
url = request_match.group(2) if request_match else ""
return {
'ip': ip,
'timestamp': timestamp,
'method': method,
'url': url,
'status': status,
'size': size,
'referer': referer,
'user_agent': user_agent,
'raw_line': line
}
return None
def process_log_file(self, filepath):
"""Verarbeitet eine einzelne Log-Datei"""
requests = []
try:
# Öffne Datei (gz oder normal)
if filepath.suffix == '.gz':
file_handle = gzip.open(filepath, 'rt', encoding='utf-8', errors='ignore')
else:
file_handle = open(filepath, 'r', encoding='utf-8', errors='ignore')
with file_handle:
for line in file_handle:
parsed = self.parse_log_line(line.strip())
if parsed and parsed['timestamp']:
# Prüfe ob innerhalb des Zeitfensters
if parsed['timestamp'] >= self.cutoff_time:
requests.append(parsed)
except Exception as e:
self.print_and_log(f" Fehler beim Verarbeiten von {filepath}: {e}")
return requests
def get_ip_info(self, ip):
"""Holt detaillierte IP-Informationen via whois"""
# Check Cache
if ip in self.ip_cache:
return self.ip_cache[ip]
try:
# Versuche GeoIP
country = "??"
try:
result = subprocess.run(['geoiplookup', ip],
capture_output=True, text=True, timeout=2)
if result.returncode == 0 and 'GeoIP Country Edition:' in result.stdout:
country = result.stdout.split('GeoIP Country Edition:')[1].split(',')[0].strip()
except:
pass
# Versuche whois via cymru
asn = "?"
as_name = "Unknown"
try:
result = subprocess.run(['whois', '-h', 'whois.cymru.com', f' -v {ip}'],
capture_output=True, text=True, timeout=3)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
if lines:
parts = lines[-1].split()
if len(parts) > 5:
asn = parts[0]
as_name = ' '.join(parts[5:])[:80]
except:
pass
# Fallback auf reguläres whois
if asn == "?" or asn == "NA":
try:
result = subprocess.run(['whois', ip],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
whois_text = result.stdout
# Extrahiere ASN
asn_match = re.search(r'(?:aut-num|origin|originas):\s*(?:AS)?(\d+)',
whois_text, re.IGNORECASE)
if asn_match:
asn = asn_match.group(1)
# Extrahiere Organisation
org_patterns = [
r'(?:org-name|orgname|organisation|organization):\s*(.+)',
r'descr:\s*(.+)',
r'netname:\s*(.+)'
]
for pattern in org_patterns:
org_match = re.search(pattern, whois_text, re.IGNORECASE)
if org_match:
as_name = org_match.group(1).strip()[:80]
break
except:
pass
info = f"{country} | AS{asn} {as_name}"
self.ip_cache[ip] = info
return info
except Exception as e:
info = f"?? | Lookup failed"
self.ip_cache[ip] = info
return info
def get_ip_info_batch(self, ips):
"""Holt IP-Informationen für mehrere IPs parallel"""
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self.get_ip_info, ip): ip for ip in ips}
results = {}
for future in as_completed(futures):
ip = futures[future]
try:
results[ip] = future.result()
except Exception as e:
results[ip] = "?? | Lookup failed"
return results
def categorize_ip(self, ip_info):
"""Kategorisiert eine IP basierend auf whois-Informationen"""
info_lower = ip_info.lower()
if any(x in info_lower for x in ['amazon', 'aws', 'ec2', 'azure', 'google cloud',
'gcp', 'digitalocean', 'linode', 'vultr', 'ovh cloud']):
return 'cloud'
elif any(x in info_lower for x in ['datacenter', 'data center', 'hosting',
'server', 'colocation', 'colo']):
return 'datacenter'
elif any(x in info_lower for x in ['hetzner', 'contabo', 'netcup', 'strato',
'1und1', 'ionos']):
return 'hosting'
elif any(x in info_lower for x in ['vpn', 'proxy', 'private relay']):
return 'vpn'
elif any(x in info_lower for x in ['tor', 'tor-exit', 'anonymizer']):
return 'tor'
elif any(x in info_lower for x in ['telecom', 'telekom', 'vodafone', 'o2',
'kabel', 'broadband', 'dsl', 'cable',
'fiber', 'residential']):
return 'residential'
else:
return 'isp'
def is_bot_user_agent(self, user_agent):
"""Prüft ob User-Agent ein Bot ist"""
ua_lower = user_agent.lower()
bot_patterns = [
'bot', 'crawler', 'spider', 'scraper', 'curl', 'wget', 'python',
'go-http', 'java', 'apache', 'scrapy', 'requests', 'aiohttp',
'axios', 'node-fetch', 'http.rb', 'libwww', 'semrush', 'ahrefs',
'mj12bot', 'dotbot', 'yandex', 'baidu', 'bingbot', 'googlebot',
'duckduck', 'slurp', 'facebot', 'whatsapp', 'telegram', 'discord', 'slack'
]
return any(pattern in ua_lower for pattern in bot_patterns)
def analyze(self):
"""Hauptanalyse-Funktion"""
# Schreibe interaktive Session-Log falls vorhanden
if self.interactive_log:
self.print_and_log("=" * 67)
self.print_and_log("INTERAKTIVE SESSION-PROTOKOLL")
self.print_and_log("=" * 67)
for line in self.interactive_log:
self.print_and_log(line)
self.print_and_log("=" * 67)
self.print_and_log()
self.print_and_log("=" * 67)
if self.top_n:
self.print_and_log(f"Request-Analyse für {self.domain} (letzte {self.hours} Stunden, Top {self.top_n})")
else:
self.print_and_log(f"Request-Analyse für {self.domain} (letzte {self.hours} Stunden, ALLE Ergebnisse)")
self.print_and_log("=" * 67)
self.print_and_log()
self.print_and_log(f"Analysiere Zeitraum: {self.cutoff_time.strftime('%d.%m.%Y %H:%M')} bis {datetime.now().strftime('%d.%m.%Y %H:%M')}")
self.print_and_log()
# 1. Lade alle Log-Dateien
self.print_and_log("Extrahiere Daten aus Logs...")
self.print_and_log("-" * 67)
log_files = []
log_dir = Path(self.log_dir)
# Normale Logs
for pattern in ['access_log', 'access_ssl_log', 'proxy_access_log', 'proxy_access_ssl_log']:
if (log_dir / pattern).exists():
log_files.append(log_dir / pattern)
# Gezippte Logs
for pattern in ['access_log.processed*.gz', 'access_ssl_log.processed*.gz']:
log_files.extend(log_dir.glob(pattern))
# Verarbeite Logs parallel
all_requests = []
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self.process_log_file, f): f for f in log_files}
for future in as_completed(futures):
filepath = futures[future]
try:
requests = future.result()
count = len(requests)
all_requests.extend(requests)
self.print_and_log(f" Verarbeite {filepath.name}... {count} Einträge")
except Exception as e:
self.print_and_log(f" Fehler bei {filepath.name}: {e}")
self.all_requests = all_requests
total = len(self.all_requests)
self.print_and_log()
self.print_and_log("-" * 67)
self.print_and_log("1. GESAMTZAHL DER REQUESTS")
self.print_and_log("-" * 67)
self.print_and_log()
self.print_and_log(f"GESAMT: {total} Requests in den letzten {self.hours} Stunden")
self.print_and_log(f"Durchschnitt: {total // self.hours} Requests/Stunde")
self.print_and_log()
# 2. Top URLs
self.analyze_urls()
# 3. Top IPs
self.analyze_ips()
# 4. User-Agents
self.analyze_user_agents()
# 5. Status Codes
self.analyze_status_codes()
# 6. Request Methods
self.analyze_methods()
# 7. Bots
self.analyze_bots()
# 8. Zeitliche Verteilung
self.analyze_hourly_distribution()
# 9. Verdächtige Aktivitäten
self.analyze_suspicious_activity()
# 10. 404-Fehler IPs
self.analyze_404_ips()
# 11. IP-Kategorisierung
self.analyze_ip_categories()
# 12. Request-Rate-Analyse
self.analyze_request_rates()
# 13. Bot-Pattern-Analyse
self.analyze_bot_patterns()
self.print_and_log()
self.print_and_log("=" * 67)
self.print_and_log("Analyse abgeschlossen")
self.print_and_log("=" * 67)
self.print_and_log()
self.print_and_log(f"Ausgabe wurde gespeichert in:")
self.print_and_log(f" {self.output_file}")
def analyze_urls(self):
"""Analysiert die häufigsten URLs"""
self.print_and_log("-" * 67)
if self.top_n:
self.print_and_log(f"2. TOP {self.top_n} ANGEFRAGTE URLs/PFADE")
else:
self.print_and_log("2. ALLE ANGEFRAGTEN URLs/PFADE (sortiert nach Häufigkeit)")
self.print_and_log("-" * 67)
self.print_and_log()
# Zähle URLs und deren Top-IPs
url_counts = Counter()
url_ips = defaultdict(Counter)
for req in self.all_requests:
url = req['url']
ip = req['ip']
url_counts[url] += 1
url_ips[url][ip] += 1
# Sortiere und limitiere
top_urls = url_counts.most_common(self.top_n)
# Hole IP-Infos für Top-IPs
all_top_ips = set()
for url, _ in top_urls[:50]: # Nur für die ersten 50 URLs
if url in url_ips:
top_ip = url_ips[url].most_common(1)[0][0]
all_top_ips.add(top_ip)
ip_infos = self.get_ip_info_batch(list(all_top_ips))
# Zeige Ergebnisse
for url, count in top_urls:
if url in url_ips:
top_ip, top_count = url_ips[url].most_common(1)[0]
ip_info = ip_infos.get(top_ip, self.get_ip_info(top_ip))
self.print_and_log(f"{count:10d} {url[:80]:<80} (Top: {top_ip} x{top_count} - {ip_info})")
else:
self.print_and_log(f"{count:10d} {url}")
self.print_and_log()
def analyze_ips(self):
"""Analysiert die häufigsten IPs"""
self.print_and_log("-" * 67)
if self.top_n:
self.print_and_log(f"3. TOP {self.top_n} IP-ADRESSEN (potenzielle Bots)")
else:
self.print_and_log("3. ALLE IP-ADRESSEN (sortiert nach Häufigkeit)")
self.print_and_log("-" * 67)
self.print_and_log()
# Zähle IPs
ip_counts = Counter(req['ip'] for req in self.all_requests)
top_ips = ip_counts.most_common(self.top_n)
# Hole IP-Infos parallel
self.print_and_log(f"Sammle IP-Informationen für {len(top_ips)} IPs...")
self.print_and_log(f"(Parallel-Modus mit {self.max_workers} Cores)")
self.print_and_log()
ip_list = [ip for ip, _ in top_ips]
ip_infos = self.get_ip_info_batch(ip_list)
# Zeige Ergebnisse
for ip, count in top_ips:
info = ip_infos.get(ip, "Lookup fehlgeschlagen")
self.print_and_log(f"{count:10d} {ip:<15} ({info})")
self.print_and_log()
def analyze_user_agents(self):
"""Analysiert User-Agents"""
self.print_and_log("-" * 67)
if self.top_n:
self.print_and_log(f"4. USER-AGENTS (Top {self.top_n})")
else:
self.print_and_log("4. ALLE USER-AGENTS (sortiert nach Häufigkeit)")
self.print_and_log("-" * 67)
self.print_and_log()
# Zähle User-Agents und deren Top-IPs
ua_counts = Counter()
ua_ips = defaultdict(Counter)
for req in self.all_requests:
ua = req['user_agent']
ip = req['ip']
ua_counts[ua] += 1
ua_ips[ua][ip] += 1
top_uas = ua_counts.most_common(self.top_n)
# Hole IP-Infos für Top-IPs
all_top_ips = set()
for ua, _ in top_uas[:30]: # Nur für die ersten 30 User-Agents
if ua in ua_ips:
top_ip = ua_ips[ua].most_common(1)[0][0]
all_top_ips.add(top_ip)
ip_infos = self.get_ip_info_batch(list(all_top_ips))
# Zeige Ergebnisse
for ua, count in top_uas:
ua_display = ua[:100] + "..." if len(ua) > 100 else ua
self.print_and_log(f"{count:10d} {ua_display}")
if ua in ua_ips:
top_ip, top_count = ua_ips[ua].most_common(1)[0]
ip_info = ip_infos.get(top_ip, self.get_ip_info(top_ip))
self.print_and_log(f" (Top-IP: {top_ip} x{top_count} - {ip_info})")
self.print_and_log()
def analyze_status_codes(self):
"""Analysiert HTTP Status Codes"""
self.print_and_log("-" * 67)
self.print_and_log("5. HTTP-STATUS-CODES")
self.print_and_log("-" * 67)
self.print_and_log()
status_counts = Counter(req['status'] for req in self.all_requests)
for status, count in status_counts.most_common():
self.print_and_log(f"{count:10d} HTTP {status}")
self.print_and_log()
def analyze_methods(self):
"""Analysiert Request-Methoden"""
self.print_and_log("-" * 67)
self.print_and_log("6. REQUESTS NACH METHODE")
self.print_and_log("-" * 67)
self.print_and_log()
method_counts = Counter(req['method'] for req in self.all_requests if req['method'])
for method, count in method_counts.most_common():
self.print_and_log(f"{count:10d} {method}")
self.print_and_log()
def analyze_bots(self):
"""Analysiert Bot-Traffic"""
self.print_and_log("-" * 67)
if self.top_n:
self.print_and_log(f"7. TOP {self.top_n} BOTS (identifiziert via User-Agent)")
else:
self.print_and_log("7. ALLE BOTS (identifiziert via User-Agent)")
self.print_and_log("-" * 67)
self.print_and_log()
bot_counts = Counter()
for req in self.all_requests:
if self.is_bot_user_agent(req['user_agent']):
bot_counts[req['user_agent']] += 1
top_bots = bot_counts.most_common(self.top_n)
for bot, count in top_bots:
bot_display = bot[:80] + "..." if len(bot) > 80 else bot
self.print_and_log(f"{count:10d} {bot_display}")
self.print_and_log()
def analyze_hourly_distribution(self):
"""Analysiert zeitliche Verteilung"""
self.print_and_log("-" * 67)
self.print_and_log("8. REQUESTS PRO STUNDE (zeitliche Verteilung)")
self.print_and_log("-" * 67)
self.print_and_log()
hourly_counts = Counter()
for req in self.all_requests:
if req['timestamp']:
hour_key = req['timestamp'].strftime("%Y-%m-%d %H:00")
hourly_counts[hour_key] += 1
# Zeige die letzten 48 Stunden
for hour, count in sorted(hourly_counts.items())[-48:]:
self.print_and_log(f"{hour} {count:10d} Requests")
self.print_and_log()
def analyze_suspicious_activity(self):
"""Analysiert verdächtige Aktivitäten"""
self.print_and_log("-" * 67)
self.print_and_log("9. VERDÄCHTIGE AKTIVITÄTEN")
self.print_and_log("-" * 67)
self.print_and_log()
# 404-Fehler URLs
self.print_and_log("404-Fehler (häufigste nicht existierende Pfade):")
error_404_urls = Counter()
for req in self.all_requests:
if req['status'] == '404':
error_404_urls[req['url']] += 1
top_404s = error_404_urls.most_common(self.top_n if self.top_n else 20)
for url, count in top_404s:
self.print_and_log(f"{count:10d} {url}")
# POST-Requests
self.print_and_log()
self.print_and_log("Häufige POST-Requests mit IPs (potenzielle Brute-Force):")
post_ips = defaultdict(Counter)
for req in self.all_requests:
if req['method'] == 'POST':
post_ips[req['url']][req['ip']] += 1
# Aggregiere POST-Requests
post_counts = []
for url, ip_counter in post_ips.items():
for ip, count in ip_counter.items():
post_counts.append((count, ip, url))
post_counts.sort(reverse=True)
top_posts = post_counts[:self.top_n] if self.top_n else post_counts[:20]
if top_posts:
# Hole IP-Infos
post_ips_list = list(set(ip for _, ip, _ in top_posts))
ip_infos = self.get_ip_info_batch(post_ips_list)
for count, ip, url in top_posts:
info = ip_infos.get(ip, "Lookup fehlgeschlagen")
self.print_and_log(f"{count:10d} {ip:<15}{url}")
self.print_and_log(f" ({info})")
self.print_and_log()
def analyze_404_ips(self):
"""Analysiert IPs mit vielen 404-Fehlern"""
self.print_and_log("-" * 67)
if self.top_n:
self.print_and_log(f"10. TOP {self.top_n} IP-ADRESSEN MIT MEISTEN 404-FEHLERN")
else:
self.print_and_log("10. ALLE IP-ADRESSEN MIT 404-FEHLERN (sortiert nach Häufigkeit)")
self.print_and_log("-" * 67)
self.print_and_log()
error_404_ips = Counter()
for req in self.all_requests:
if req['status'] == '404':
error_404_ips[req['ip']] += 1
top_404_ips = error_404_ips.most_common(self.top_n)
if top_404_ips:
# Hole IP-Infos
ip_list = [ip for ip, _ in top_404_ips]
ip_infos = self.get_ip_info_batch(ip_list)
for ip, count in top_404_ips:
info = ip_infos.get(ip, "Lookup fehlgeschlagen")
self.print_and_log(f"{count:10d} {ip:<15} ({info})")
self.print_and_log()
def analyze_ip_categories(self):
"""Kategorisiert IPs nach Typ"""
self.print_and_log("-" * 67)
self.print_and_log("11. IP-KATEGORISIERUNG NACH TYP (Top 20 je Kategorie)")
self.print_and_log("-" * 67)
self.print_and_log()
self.print_and_log("Analysiere IP-Typen...")
# Sammle alle unique IPs
ip_counts = Counter(req['ip'] for req in self.all_requests)
unique_ips = list(ip_counts.keys())
self.print_and_log(f"Führe Parallel-Lookups für {len(unique_ips)} unique IPs durch (mit {self.max_workers} Cores)...")
# Hole IP-Infos
ip_infos = self.get_ip_info_batch(unique_ips)
# Kategorisiere IPs
categories = defaultdict(list)
for ip, count in ip_counts.items():
info = ip_infos.get(ip, "Unknown")
category = self.categorize_ip(info)
categories[category].append((count, ip, info))
# Zeige Kategorien
category_names = {
'cloud': 'CLOUD-PROVIDER (AWS, Azure, GCP, etc.)',
'datacenter': 'RECHENZENTREN / DATACENTER',
'hosting': 'HOSTING-PROVIDER',
'vpn': 'VPN / PROXY-DIENSTE',
'tor': 'TOR-NETZWERK',
'residential': 'PRIVAT-NUTZER / ISP (Telekom, Vodafone, etc.)',
'isp': 'SONSTIGE ISP'
}
for cat_key, cat_name in category_names.items():
self.print_and_log()
self.print_and_log(f"{cat_name}:")
if cat_key in categories:
sorted_ips = sorted(categories[cat_key], reverse=True)[:20]
for count, ip, info in sorted_ips:
self.print_and_log(f"{count:10d} {ip:<15} ({info})")
else:
self.print_and_log(" Keine gefunden")
self.print_and_log()
def analyze_request_rates(self):
"""Analysiert Request-Raten um Burst-Patterns zu erkennen"""
self.print_and_log("-" * 67)
self.print_and_log("12. REQUEST-RATE-ANALYSE (für Rate-Limiting)")
self.print_and_log("-" * 67)
self.print_and_log()
self.print_and_log("Analysiere Request-Raten pro IP (Requests/Minute)...")
self.print_and_log("Hilft bei der Entscheidung für angemessene Rate-Limits")
self.print_and_log()
# Sammle IPs mit mindestens 100 Requests
ip_counts = Counter(req['ip'] for req in self.all_requests)
relevant_ips = [(count, ip) for ip, count in ip_counts.items() if count >= 100]
relevant_ips.sort(reverse=True)
# Analysiere Top-IPs
rate_analysis = []
for count, ip in relevant_ips[:50]: # Top 50 IPs analysieren
max_rate, avg_rate, burst_count = self.calculate_request_rate(ip)
if max_rate > 0:
rate_analysis.append((max_rate, avg_rate, count, ip, burst_count))
# Sortiere nach maximaler Rate
rate_analysis.sort(reverse=True)
if rate_analysis:
self.print_and_log("=" * 67)
self.print_and_log("TOP IPS NACH MAXIMALER REQUEST-RATE")
self.print_and_log("=" * 67)
self.print_and_log()
self.print_and_log(" IP | Max/Min | Avg/Min | Total | Bursts | Info")
self.print_and_log(" " + "-" * 63)
# Hole IP-Infos
ip_list = [ip for _, _, _, ip, _ in rate_analysis[:20]]
ip_infos = self.get_ip_info_batch(ip_list)
for max_rate, avg_rate, total, ip, burst_count in rate_analysis[:20]:
info = ip_infos.get(ip, "Unknown")
info_short = info[:40] + "..." if len(info) > 40 else info
# Warnung-Symbole basierend auf Rate
warning = ""
if max_rate >= self.extreme_rate_threshold:
warning = "🔴" # Extrem hoch (über konfiguriertem Schwellwert)
elif max_rate >= self.extreme_rate_threshold * 0.5:
warning = "🟡" # Hoch (50% des Schwellwerts)
elif max_rate >= self.extreme_rate_threshold * 0.25:
warning = "⚠️" # Erhöht (25% des Schwellwerts)
self.print_and_log(f" {warning:<2} {ip:<15} | {max_rate:7d} | {avg_rate:7.1f} | {total:5d} | {burst_count:6d} | {info_short}")
# Statistiken
self.print_and_log()
self.print_and_log("=" * 67)
self.print_and_log("RATE-LIMITING EMPFEHLUNGEN")
self.print_and_log("=" * 67)
self.print_and_log()
# Berechne Perzentile
all_max_rates = [r[0] for r in rate_analysis]
if all_max_rates:
percentile_50 = sorted(all_max_rates)[len(all_max_rates)//2]
percentile_90 = sorted(all_max_rates)[int(len(all_max_rates)*0.9) if int(len(all_max_rates)*0.9) > 0 else 0]
percentile_99 = sorted(all_max_rates)[int(len(all_max_rates)*0.99) if int(len(all_max_rates)*0.99) > 0 else -1]
self.print_and_log("📊 Request-Rate Verteilung:")
self.print_and_log(f" 50% der IPs: <= {percentile_50} Requests/Minute")
self.print_and_log(f" 90% der IPs: <= {percentile_90} Requests/Minute")
self.print_and_log(f" 99% der IPs: <= {percentile_99} Requests/Minute")
self.print_and_log()
# Empfehlungen
self.print_and_log("💡 Empfohlene Rate-Limits basierend auf Analyse:")
self.print_and_log()
self.print_and_log(f" Konfigurierter Extreme-Schwellwert: {self.extreme_rate_threshold} Requests/Minute")
self.print_and_log()
if percentile_90 < 10:
self.print_and_log(" ✅ NORMAL TRAFFIC: Die meisten IPs haben niedrige Raten")
self.print_and_log(" - Standard-Limit: 20-30 Requests/Minute")
self.print_and_log(" - Burst-Limit: 5-10 Requests/10 Sekunden")
elif percentile_90 < 30:
self.print_and_log(" ⚠️ MODERATE TRAFFIC: Einige IPs zeigen erhöhte Aktivität")
self.print_and_log(" - Standard-Limit: 30-60 Requests/Minute")
self.print_and_log(" - Burst-Limit: 10-15 Requests/10 Sekunden")
else:
self.print_and_log(" 🔴 HIGH TRAFFIC: Viele IPs mit hohen Request-Raten")
self.print_and_log(" - Standard-Limit: 60-120 Requests/Minute")
self.print_and_log(" - Burst-Limit: 20-30 Requests/10 Sekunden")
self.print_and_log()
self.print_and_log(" Zusätzliche Überlegungen:")
self.print_and_log(" - Residential IPs: Großzügigere Limits")
self.print_and_log(" - Cloud/Datacenter: Strengere Limits")
self.print_and_log(" - Bekannte Bots: Sehr strenge Limits oder Block")
# Zeige IPs die definitiv geblockt werden sollten
extreme_ips = [ip for rate, _, _, ip, _ in rate_analysis if rate > self.extreme_rate_threshold]
if extreme_ips:
self.print_and_log()
self.print_and_log(f" 🔴 IPs mit extremen Raten (>{self.extreme_rate_threshold}/min) - SOFORT BLOCKEN:")
for ip in extreme_ips[:10]:
info = ip_infos.get(ip, "Unknown")
self.print_and_log(f" - {ip}: {info[:50]}")
else:
self.print_and_log(" Keine IPs mit genügend Daten für Rate-Analyse gefunden")
self.print_and_log()
def calculate_request_rate(self, ip):
"""Berechnet die maximale Request-Rate einer IP (Requests pro Minute)"""
# Sammle alle Timestamps für diese IP
timestamps = []
for req in self.all_requests:
if req['ip'] == ip and req['timestamp']:
timestamps.append(req['timestamp'])
if len(timestamps) < 2:
return 0, 0, 0 # max_rate, avg_rate, burst_count
timestamps.sort()
# Analysiere Requests in 60-Sekunden-Fenstern
max_requests_per_minute = 0
total_minutes = 0
burst_count = 0 # Anzahl der Minuten mit > 10 Requests
# Sliding window von 60 Sekunden
for i in range(len(timestamps)):
window_end = timestamps[i] + timedelta(seconds=60)
requests_in_window = 0
for j in range(i, len(timestamps)):
if timestamps[j] <= window_end:
requests_in_window += 1
else:
break
if requests_in_window > max_requests_per_minute:
max_requests_per_minute = requests_in_window
if requests_in_window > 10:
burst_count += 1
# Durchschnittliche Rate
total_duration = (timestamps[-1] - timestamps[0]).total_seconds()
if total_duration > 0:
avg_rate = (len(timestamps) * 60) / total_duration
else:
avg_rate = 0
return max_requests_per_minute, avg_rate, burst_count
def get_top_urls_for_ip(self, ip, limit=3):
"""Holt die Top-URLs für eine spezifische IP"""
url_counts = Counter()
for req in self.all_requests:
if req['ip'] == ip:
url_counts[req['url']] += 1
return url_counts.most_common(limit)
def analyze_bot_patterns(self):
"""Erweiterte Bot-Pattern-Analyse"""
self.print_and_log("-" * 67)
self.print_and_log("13. BOT-PATTERN-ANALYSE & ENTSCHEIDUNGSHILFE")
self.print_and_log("-" * 67)
self.print_and_log()
if len(self.all_requests) < 10:
self.print_and_log("⚠️ WARNUNG: Zu wenig Daten für erweiterte Analyse (< 10 Requests)")
self.print_and_log(" Überspringe Bot-Pattern-Analyse...")
return
self.print_and_log("Analysiere Bot-Verhaltensmuster für fundierte Block-Entscheidungen...")
self.print_and_log()
# 1. IPs mit hoher Request-Rate aber wenig URL-Varianz
self.print_and_log("=" * 67)
self.print_and_log("VERDÄCHTIGE IPs: Hohe Request-Rate + geringe URL-Vielfalt")
self.print_and_log("=" * 67)
self.print_and_log("(Echte User besuchen viele verschiedene Seiten, Bots oft nur wenige)")
self.print_and_log()
ip_stats = defaultdict(lambda: {'count': 0, 'urls': set()})
for req in self.all_requests:
ip_stats[req['ip']]['count'] += 1
ip_stats[req['ip']]['urls'].add(req['url'])
low_variety_ips = []
for ip, stats in ip_stats.items():
if stats['count'] > 100:
unique_urls = len(stats['urls'])
ratio = (unique_urls * 100) // stats['count']
if ratio < 5:
low_variety_ips.append((stats['count'], ip, unique_urls, ratio, stats['urls']))
low_variety_ips.sort(reverse=True)
if low_variety_ips:
# Hole IP-Infos
ip_list = [ip for _, ip, _, _, _ in low_variety_ips[:20]]
ip_infos = self.get_ip_info_batch(ip_list)
for count, ip, unique_urls, ratio, urls in low_variety_ips[:20]:
info = ip_infos.get(ip, "Unknown")
self.print_and_log(f"⚠️ {ip}: {count} requests, nur {unique_urls} unique URLs ({ratio}% Vielfalt)")
self.print_and_log(f" {info}")
self.print_and_log(" Top-URLs:")
# Zeige Top-3 URLs
url_counts = Counter(req['url'] for req in self.all_requests if req['ip'] == ip)
for url, url_count in url_counts.most_common(3):
self.print_and_log(f" {url_count:6d} x {url}")
self.print_and_log()
else:
self.print_and_log(" Keine verdächtigen IPs gefunden")
self.print_and_log()
# 2. User-Agent + IP Kombinationen
self.print_and_log("=" * 67)
self.print_and_log("USER-AGENT + IP KOMBINATIONEN (Bot-Fingerprinting)")
self.print_and_log("=" * 67)
self.print_and_log("(Gleicher User-Agent von vielen IPs = verteilter Bot)")
self.print_and_log()
ua_stats = defaultdict(lambda: {'count': 0, 'ips': set()})
for req in self.all_requests:
ua_stats[req['user_agent']]['count'] += 1
ua_stats[req['user_agent']]['ips'].add(req['ip'])
distributed_bots = []
for ua, stats in ua_stats.items():
unique_ips = len(stats['ips'])
if unique_ips > 20 and stats['count'] > 1000:
distributed_bots.append((stats['count'], unique_ips, ua, stats['ips']))
distributed_bots.sort(reverse=True)
if distributed_bots:
for total_requests, unique_ips, ua, ips in distributed_bots[:30]:
ua_short = ua[:80] + "..." if len(ua) > 80 else ua
avg_per_ip = total_requests / unique_ips
self.print_and_log(f"🤖 {ua_short}")
self.print_and_log(f" {total_requests} requests von {unique_ips} verschiedenen IPs (Ø {avg_per_ip:.1f} req/IP)")
# Zeige Top-3 IPs
self.print_and_log(" Top-IPs:")
ip_counts = Counter(req['ip'] for req in self.all_requests if req['user_agent'] == ua)
top_ips = ip_counts.most_common(3)
if top_ips:
ip_list = [ip for ip, _ in top_ips]
ip_infos = self.get_ip_info_batch(ip_list)
for ip_addr, ip_count in top_ips:
ip_info = ip_infos.get(ip_addr, "Unknown")[:60]
self.print_and_log(f" {ip_count:6d} x {ip_addr} ({ip_info})")
self.print_and_log()
else:
self.print_and_log(" Keine verteilten Bots gefunden")
self.print_and_log()
# 3. Scanner-Aktivität (404-Pattern)
self.print_and_log("=" * 67)
self.print_and_log("SCANNER-AKTIVITÄT: 404-Fehler Pattern")
self.print_and_log("=" * 67)
self.print_and_log("(Viele 404-Fehler = Scanner suchen nach Schwachstellen)")
self.print_and_log()
ip_404_stats = defaultdict(lambda: {'total': 0, 'errors': 0})
for req in self.all_requests:
ip_404_stats[req['ip']]['total'] += 1
if req['status'] == '404':
ip_404_stats[req['ip']]['errors'] += 1
high_404_ips = []
for ip, stats in ip_404_stats.items():
if stats['total'] > 50 and stats['errors'] > 0:
error_rate = (stats['errors'] * 100) // stats['total']
if error_rate > 30:
high_404_ips.append((stats['total'], stats['errors'], error_rate, ip))
high_404_ips.sort(key=lambda x: x[2], reverse=True)
if high_404_ips:
# Hole IP-Infos
ip_list = [ip for _, _, _, ip in high_404_ips[:15]]
ip_infos = self.get_ip_info_batch(ip_list)
for total, errors, error_rate, ip in high_404_ips[:15]:
info = ip_infos.get(ip, "Unknown")
self.print_and_log(f"🔍 {ip}: {errors}/{total} requests sind 404-Fehler ({error_rate}%)")
self.print_and_log(f" {info}")
# Zeige Top-3 404-URLs
self.print_and_log(" Gesuchte Pfade:")
error_urls = Counter(req['url'] for req in self.all_requests
if req['ip'] == ip and req['status'] == '404')
for url, count in error_urls.most_common(3):
self.print_and_log(f" {count:6d} x {url}")
self.print_and_log()
else:
self.print_and_log(" Keine IPs mit hoher 404-Rate gefunden")
self.print_and_log()
# 4. Empfohlene Block-Liste
self.print_and_log("=" * 67)
self.print_and_log("EMPFOHLENE BLOCK-LISTE (basierend auf Analyse)")
self.print_and_log("=" * 67)
self.print_and_log()
self.print_and_log("📋 IPs zum sofortigen Blockieren (hohe Konfidenz):")
self.print_and_log(" (Datacenter + hohe Request-Rate + geringe Vielfalt ODER hohe 404-Rate)")
self.print_and_log()
# Kombiniere Kandidaten
block_candidates = []
# Low variety IPs
for count, ip, unique_urls, ratio, _ in low_variety_ips[:30]:
block_candidates.append((count, ip, 'low_variety', ratio))
# High 404 IPs
for total, errors, error_rate, ip in high_404_ips[:30]:
block_candidates.append((total, ip, 'high_404', error_rate))
# Dedupliziere und sortiere
seen_ips = set()
final_candidates = []
for count, ip, reason, metric in sorted(block_candidates, reverse=True):
if ip not in seen_ips:
seen_ips.add(ip)
final_candidates.append((count, ip, reason, metric))
if final_candidates:
# Hole IP-Infos
ip_list = [ip for _, ip, _, _ in final_candidates[:30]]
ip_infos = self.get_ip_info_batch(ip_list)
for count, ip, reason, metric in final_candidates[:30]:
info = ip_infos.get(ip, "Unknown")
# Prüfe ob Datacenter/Cloud
ip_type = ""
if any(x in info.lower() for x in ['amazon', 'aws', 'azure', 'google cloud',
'digitalocean', 'datacenter', 'hosting', 'hetzner']):
ip_type = "[DATACENTER/CLOUD]"
reason_text = ""
if reason == 'low_variety':
reason_text = f"Geringe URL-Vielfalt ({metric}%)"
elif reason == 'high_404':
reason_text = f"Hohe 404-Rate ({metric}%)"
# Berechne Request-Rate
max_rate, avg_rate, burst_count = self.calculate_request_rate(ip)
rate_info = f"Max: {max_rate}/min, Avg: {avg_rate:.1f}/min"
self.print_and_log(f" {ip:<15} {ip_type:<20} - {reason_text}")
self.print_and_log(f" {count} requests | {info}")
self.print_and_log(f" Request-Rate: {rate_info}")
# Zeige Top-URLs für diese IP
top_urls = self.get_top_urls_for_ip(ip, 3)
if top_urls:
self.print_and_log(" Top angefragte URLs:")
for url, url_count in top_urls:
url_display = url[:60] + "..." if len(url) > 60 else url
self.print_and_log(f" {url_count:6d}x {url_display}")
# Warnung bei sehr hohen Raten
if max_rate > self.extreme_rate_threshold:
self.print_and_log(f" ⚠️ WARNUNG: Sehr hohe Request-Rate! ({max_rate} Requests/Minute)")
elif max_rate > self.extreme_rate_threshold * 0.75:
self.print_and_log(f" ⚠️ Erhöhte Request-Rate: {max_rate} Requests/Minute")
self.print_and_log()
else:
self.print_and_log(" Keine verdächtigen IPs für Block-Liste gefunden")
self.print_and_log()
self.print_and_log("💡 HINWEIS: Überprüfe diese IPs manuell bevor du sie blockierst!")
self.print_and_log(" - Residential IPs (Telekom, Vodafone) könnten echte User sein")
self.print_and_log(" - Cloud/Datacenter IPs sind meist Bots")
def main():
"""Hauptfunktion mit interaktivem Modus"""
# Wenn keine Argumente angegeben, starte interaktiven Modus
if len(sys.argv) == 1:
# Sammle alle Interaktions-Logs
all_interaction_logs = []
all_interaction_logs.append("\n" + "="*67)
all_interaction_logs.append(" JTL-SHOP LOG-ANALYSE TOOL ".center(67))
all_interaction_logs.append("="*67)
all_interaction_logs.append("\nInteraktiver Modus - Keine Parameter angegeben")
all_interaction_logs.append("Starte Schritt-für-Schritt Konfiguration...")
print("\n" + "="*67)
print(" JTL-SHOP LOG-ANALYSE TOOL ".center(67))
print("="*67)
print("\nInteraktiver Modus - Keine Parameter angegeben")
print("Starte Schritt-für-Schritt Konfiguration...")
# 1. Domain auswählen
domains = discover_domains()
selected_domain, domain_log = select_domain_interactive(domains)
all_interaction_logs.extend(domain_log)
if not selected_domain:
sys.exit(0)
# 2. Zeitspanne wählen
hours, timespan_log = get_timespan_interactive()
all_interaction_logs.extend(timespan_log)
# 3. Top-N wählen
top_n, topn_log = get_top_n_interactive()
all_interaction_logs.extend(topn_log)
# 4. Extreme Rate Schwellwert wählen
extreme_threshold, threshold_log = get_extreme_rate_threshold_interactive()
all_interaction_logs.extend(threshold_log)
# Zusammenfassung
summary_lines = [
"\n" + "="*67,
"ANALYSE-KONFIGURATION",
"="*67,
f" Domain: {selected_domain}",
f" Zeitspanne: {hours} Stunden",
]
if top_n:
summary_lines.append(f" Ergebnisse: Top {top_n}")
else:
summary_lines.append(f" Ergebnisse: ALLE")
summary_lines.append(f" Extreme Rate: {extreme_threshold} Requests/Minute (~{extreme_threshold/60:.1f}/Sekunde)")
summary_lines.append("="*67)
for line in summary_lines:
print(line)
all_interaction_logs.append(line)
confirm = input("\n📌 Analyse starten? [Y/n]: ").strip().lower()
all_interaction_logs.append(f"\n📌 Analyse starten? [Y/n]: {confirm}")
if confirm and confirm not in ['y', 'yes', 'j', 'ja']:
msg = "Abbruch."
print(msg)
all_interaction_logs.append(msg)
sys.exit(0)
msg = "\n🚀 Starte Analyse...\n"
print(msg)
all_interaction_logs.append(msg)
# Starte Analyse mit gewählten Parametern und Interaction-Log
analyzer = LogAnalyzer(hours=hours, top_n=top_n, domain=selected_domain,
extreme_rate_threshold=extreme_threshold,
interactive_log=all_interaction_logs)
try:
analyzer.analyze()
except KeyboardInterrupt:
print("\n\nAnalyse abgebrochen durch Benutzer.")
sys.exit(0)
except Exception as e:
print(f"\nFehler während der Analyse: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
else:
# Klassischer Modus mit Kommandozeilen-Argumenten
parser = argparse.ArgumentParser(
description='Request-Analyse für JTL-Shop mit Bot-Erkennung',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Verwendung:
INTERAKTIVER MODUS (empfohlen):
%(prog)s # Startet interaktive Shop-Auswahl und Konfiguration
KOMMANDOZEILEN-MODUS:
%(prog)s <domain> <stunden> [top_n]
%(prog)s <stunden> [top_n] # Nutzt Standard-Domain
Beispiele:
%(prog)s # Interaktiver Modus
%(prog)s taschengelddieb.de 24h # Spezifische Domain, 24h, alle Ergebnisse
%(prog)s 24h # Standard-Domain, 24h, alle Ergebnisse
%(prog)s 12h 50 # Standard-Domain, 12h, Top 50
%(prog)s shop.de 72h 100 # Spezifische Domain, 72h, Top 100
"""
)
# Flexibles Parsing der Argumente
parser.add_argument('arg1', nargs='?', help='Domain oder Zeitspanne')
parser.add_argument('arg2', nargs='?', help='Zeitspanne oder Top-N')
parser.add_argument('arg3', nargs='?', help='Top-N oder Extreme-Rate')
parser.add_argument('arg4', nargs='?', help='Extreme-Rate (optional)')
parser.add_argument('--extreme-rate', type=int, default=60,
help='Extreme Rate Schwellwert in Requests/Minute (Standard: 60)')
args = parser.parse_args()
# Intelligente Argument-Interpretation
domain = None
timespan = None
top_n = None
extreme_rate = args.extreme_rate # Nutze den --extreme-rate flag als Default
# Prüfe ob arg1 eine Domain ist (enthält Punkt) oder Zeitspanne
if args.arg1:
if '.' in args.arg1:
# Es ist eine Domain
domain = args.arg1
timespan = args.arg2
if args.arg3:
try:
top_n = int(args.arg3)
except (ValueError, TypeError):
pass
elif args.arg2 and not any(c in args.arg2 for c in ['h', 'H']):
# arg2 könnte top_n sein
try:
top_n = int(args.arg2)
timespan = None
except (ValueError, TypeError):
pass
else:
# Es ist eine Zeitspanne
timespan = args.arg1
if args.arg2:
try:
top_n = int(args.arg2)
except (ValueError, TypeError):
# Vielleicht ist es eine Domain
if '.' in args.arg2:
domain = args.arg2
# Falls keine Domain angegeben, versuche Standard oder zeige Auswahl
if not domain:
domains = discover_domains()
if len(domains) == 1:
domain = domains[0]
print(f"Verwende einzige verfügbare Domain: {domain}")
elif 'taschengelddieb.de' in domains:
domain = 'taschengelddieb.de'
print(f"Verwende Standard-Domain: {domain}")
else:
# Interaktive Auswahl
domain, _ = select_domain_interactive(domains)
if not domain:
sys.exit(0)
# Parse Zeitspanne
if not timespan:
hours = 48
print(f"Keine Zeitspanne angegeben, nutze Standard: {hours}h")
else:
timespan = timespan.rstrip('hH')
try:
hours = int(timespan)
except ValueError:
print(f"Fehler: Ungültige Zeitspanne '{timespan}'")
print("Nutze Format: <Stunden>h oder <Stunden>")
sys.exit(1)
# Info ausgeben
if top_n:
print(f"Starte Analyse für die letzten {hours} Stunden (Top {top_n})...")
else:
print(f"Starte Analyse für die letzten {hours} Stunden (ALLE Ergebnisse)...")
print(f"Domain: {domain}")
print(f"Extreme Rate Schwellwert: {extreme_rate} Requests/Minute")
print(f"CPU-Cores: {multiprocessing.cpu_count()}")
print()
# Starte Analyse
analyzer = LogAnalyzer(hours=hours, top_n=top_n, domain=domain,
extreme_rate_threshold=extreme_rate)
try:
analyzer.analyze()
except KeyboardInterrupt:
print("\n\nAnalyse abgebrochen durch Benutzer.")
sys.exit(0)
except Exception as e:
print(f"\nFehler während der Analyse: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()