1570 lines
63 KiB
Python
1570 lines
63 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Request-Analyse für JTL-Shop Domains - Python Version
|
|
Analysiert Apache/Nginx Logs mit detaillierter Bot-Erkennung und IP-Klassifizierung
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import re
|
|
import gzip
|
|
import subprocess
|
|
import socket
|
|
import ipaddress
|
|
from datetime import datetime, timedelta
|
|
from collections import defaultdict, Counter
|
|
from pathlib import Path
|
|
import argparse
|
|
import multiprocessing
|
|
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
|
|
import functools
|
|
import signal
|
|
import tempfile
|
|
import json
|
|
|
|
# Basis-Pfad für vhosts
|
|
VHOSTS_BASE = "/var/www/vhosts"
|
|
|
|
# Cache für IP-Lookups (wird zwischen Threads geteilt)
|
|
ip_info_cache = {}
|
|
|
|
# Monatsnamen-Mapping für Apache-Logs
|
|
MONTH_NAMES = {
|
|
'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4,
|
|
'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8,
|
|
'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
|
|
}
|
|
|
|
def discover_domains():
|
|
"""Entdeckt alle verfügbaren Domains/Shops im vhosts Verzeichnis"""
|
|
domains = []
|
|
vhosts_path = Path(VHOSTS_BASE)
|
|
|
|
if not vhosts_path.exists():
|
|
return domains
|
|
|
|
# Durchsuche alle Verzeichnisse in vhosts
|
|
for domain_dir in vhosts_path.iterdir():
|
|
if domain_dir.is_dir():
|
|
# Prüfe ob ein logs-Verzeichnis existiert
|
|
logs_dir = domain_dir / "logs"
|
|
if logs_dir.exists() and logs_dir.is_dir():
|
|
# Prüfe ob Log-Dateien vorhanden sind
|
|
log_files = list(logs_dir.glob("access*log*"))
|
|
if log_files:
|
|
domain_name = domain_dir.name
|
|
# Filtere System-Verzeichnisse
|
|
if not domain_name.startswith('.') and domain_name not in ['default', 'system']:
|
|
domains.append(domain_name)
|
|
|
|
return sorted(domains)
|
|
|
|
def select_domain_interactive(domains):
|
|
"""Interaktive Domain-Auswahl mit Logging"""
|
|
interaction_log = []
|
|
|
|
interaction_log.append("\n" + "="*67)
|
|
interaction_log.append("VERFÜGBARE SHOPS/DOMAINS")
|
|
interaction_log.append("="*67)
|
|
interaction_log.append("")
|
|
|
|
print("\n" + "="*67)
|
|
print("VERFÜGBARE SHOPS/DOMAINS")
|
|
print("="*67)
|
|
print()
|
|
|
|
if not domains:
|
|
msg = f"❌ Keine Domains mit Log-Dateien gefunden in {VHOSTS_BASE}"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
return None, interaction_log
|
|
|
|
# Zeige nummerierte Liste
|
|
for i, domain in enumerate(domains, 1):
|
|
# Prüfe Anzahl der Log-Dateien für zusätzliche Info
|
|
logs_dir = Path(VHOSTS_BASE) / domain / "logs"
|
|
log_count = len(list(logs_dir.glob("access*log*")))
|
|
line = f" {i:2d}. {domain:<40} ({log_count} Log-Dateien)"
|
|
print(line)
|
|
interaction_log.append(line)
|
|
|
|
interaction_log.append("")
|
|
interaction_log.append("Eingabemöglichkeiten:")
|
|
interaction_log.append(" - Nummer eingeben (z.B. '1' oder '2')")
|
|
interaction_log.append(" - Domain-Namen eingeben (z.B. 'taschengelddieb.de')")
|
|
interaction_log.append(" - Enter für Abbruch")
|
|
interaction_log.append("")
|
|
|
|
print()
|
|
print("Eingabemöglichkeiten:")
|
|
print(" - Nummer eingeben (z.B. '1' oder '2')")
|
|
print(" - Domain-Namen eingeben (z.B. 'taschengelddieb.de')")
|
|
print(" - Enter für Abbruch")
|
|
print()
|
|
|
|
while True:
|
|
choice = input("🔍 Welchen Shop analysieren? ").strip()
|
|
interaction_log.append(f"🔍 Welchen Shop analysieren? {choice}")
|
|
|
|
if not choice:
|
|
msg = "Abbruch."
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
return None, interaction_log
|
|
|
|
# Prüfe ob Nummer eingegeben wurde
|
|
try:
|
|
num = int(choice)
|
|
if 1 <= num <= len(domains):
|
|
selected = domains[num - 1]
|
|
msg = f"\n✅ Ausgewählt: {selected}"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
return selected, interaction_log
|
|
else:
|
|
msg = f"❌ Ungültige Nummer. Bitte 1-{len(domains)} eingeben."
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
except ValueError:
|
|
# Prüfe ob Domain-Name eingegeben wurde
|
|
if choice in domains:
|
|
msg = f"\n✅ Ausgewählt: {choice}"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
return choice, interaction_log
|
|
# Prüfe ob Teil-Match existiert
|
|
matches = [d for d in domains if choice.lower() in d.lower()]
|
|
if len(matches) == 1:
|
|
msg = f"\n✅ Ausgewählt: {matches[0]}"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
return matches[0], interaction_log
|
|
elif len(matches) > 1:
|
|
msg = f"❌ Mehrere Treffer gefunden: {', '.join(matches)}"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
print(" Bitte genauer spezifizieren.")
|
|
interaction_log.append(" Bitte genauer spezifizieren.")
|
|
else:
|
|
msg = f"❌ Domain '{choice}' nicht gefunden."
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
|
|
def get_timespan_interactive():
|
|
"""Interaktive Zeitspannen-Eingabe mit Logging"""
|
|
interaction_log = []
|
|
|
|
interaction_log.append("\n" + "="*67)
|
|
interaction_log.append("ZEITSPANNE FÜR ANALYSE")
|
|
interaction_log.append("="*67)
|
|
interaction_log.append("")
|
|
interaction_log.append("Beispiele:")
|
|
interaction_log.append(" - '6' oder '6h' für die letzten 6 Stunden")
|
|
interaction_log.append(" - '24' oder '24h' für die letzten 24 Stunden")
|
|
interaction_log.append(" - '72' oder '72h' für die letzten 3 Tage")
|
|
interaction_log.append(" - Enter für Standard (48 Stunden)")
|
|
interaction_log.append("")
|
|
|
|
print("\n" + "="*67)
|
|
print("ZEITSPANNE FÜR ANALYSE")
|
|
print("="*67)
|
|
print()
|
|
print("Beispiele:")
|
|
print(" - '6' oder '6h' für die letzten 6 Stunden")
|
|
print(" - '24' oder '24h' für die letzten 24 Stunden")
|
|
print(" - '72' oder '72h' für die letzten 3 Tage")
|
|
print(" - Enter für Standard (48 Stunden)")
|
|
print()
|
|
|
|
while True:
|
|
choice = input("⏰ Zeitspanne in Stunden: ").strip()
|
|
interaction_log.append(f"⏰ Zeitspanne in Stunden: {choice}")
|
|
|
|
if not choice:
|
|
msg = "✅ Verwende Standard: 48 Stunden"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
return 48, interaction_log
|
|
|
|
# Entferne 'h' falls vorhanden
|
|
choice = choice.rstrip('h')
|
|
|
|
try:
|
|
hours = int(choice)
|
|
if hours <= 0:
|
|
msg = "❌ Zeitspanne muss größer als 0 sein."
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
elif hours > 8760: # 1 Jahr
|
|
msg = "❌ Zeitspanne zu groß (max. 8760 Stunden = 1 Jahr)"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
else:
|
|
msg = f"✅ Zeitspanne: {hours} Stunden"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
return hours, interaction_log
|
|
except ValueError:
|
|
msg = "❌ Ungültige Eingabe. Bitte eine Zahl eingeben."
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
|
|
def get_top_n_interactive():
|
|
"""Interaktive Top-N Eingabe mit Logging"""
|
|
interaction_log = []
|
|
|
|
interaction_log.append("\n" + "="*67)
|
|
interaction_log.append("ANZAHL DER ERGEBNISSE")
|
|
interaction_log.append("="*67)
|
|
interaction_log.append("")
|
|
interaction_log.append("Beispiele:")
|
|
interaction_log.append(" - '50' für Top 50 Ergebnisse")
|
|
interaction_log.append(" - '100' für Top 100 Ergebnisse")
|
|
interaction_log.append(" - '0' oder 'all' für ALLE Ergebnisse")
|
|
interaction_log.append(" - Enter für ALLE Ergebnisse")
|
|
interaction_log.append("")
|
|
|
|
print("\n" + "="*67)
|
|
print("ANZAHL DER ERGEBNISSE")
|
|
print("="*67)
|
|
print()
|
|
print("Beispiele:")
|
|
print(" - '50' für Top 50 Ergebnisse")
|
|
print(" - '100' für Top 100 Ergebnisse")
|
|
print(" - '0' oder 'all' für ALLE Ergebnisse")
|
|
print(" - Enter für ALLE Ergebnisse")
|
|
print()
|
|
|
|
while True:
|
|
choice = input("📊 Anzahl der Top-Ergebnisse: ").strip().lower()
|
|
interaction_log.append(f"📊 Anzahl der Top-Ergebnisse: {choice}")
|
|
|
|
if not choice or choice == '0' or choice == 'all' or choice == 'alle':
|
|
msg = "✅ Zeige ALLE Ergebnisse"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
return None, interaction_log
|
|
|
|
try:
|
|
top_n = int(choice)
|
|
if top_n <= 0:
|
|
msg = "✅ Zeige ALLE Ergebnisse"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
return None, interaction_log
|
|
else:
|
|
msg = f"✅ Zeige Top {top_n} Ergebnisse"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
return top_n, interaction_log
|
|
except ValueError:
|
|
msg = "❌ Ungültige Eingabe. Bitte eine Zahl eingeben oder 'all' für alle."
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
|
|
def get_extreme_rate_threshold_interactive():
|
|
"""Interaktive Eingabe für extreme Request-Rate Schwellwert mit Logging"""
|
|
interaction_log = []
|
|
|
|
interaction_log.append("\n" + "="*67)
|
|
interaction_log.append("EXTREME REQUEST-RATE SCHWELLWERT")
|
|
interaction_log.append("="*67)
|
|
interaction_log.append("")
|
|
interaction_log.append("Ab welcher Request-Rate (Requests/Minute) soll eine IP")
|
|
interaction_log.append("als EXTREM eingestuft und zum sofortigen Block empfohlen werden?")
|
|
interaction_log.append("")
|
|
interaction_log.append("Beispiele:")
|
|
interaction_log.append(" - '60' = 1 Request pro Sekunde (Standard)")
|
|
interaction_log.append(" - '120' = 2 Requests pro Sekunde")
|
|
interaction_log.append(" - '250' = ~4 Requests pro Sekunde")
|
|
interaction_log.append(" - '600' = 10 Requests pro Sekunde")
|
|
interaction_log.append(" - Enter für Standard (60)")
|
|
interaction_log.append("")
|
|
|
|
print("\n" + "="*67)
|
|
print("EXTREME REQUEST-RATE SCHWELLWERT")
|
|
print("="*67)
|
|
print()
|
|
print("Ab welcher Request-Rate (Requests/Minute) soll eine IP")
|
|
print("als EXTREM eingestuft und zum sofortigen Block empfohlen werden?")
|
|
print()
|
|
print("Beispiele:")
|
|
print(" - '60' = 1 Request pro Sekunde (Standard)")
|
|
print(" - '120' = 2 Requests pro Sekunde")
|
|
print(" - '250' = ~4 Requests pro Sekunde")
|
|
print(" - '600' = 10 Requests pro Sekunde")
|
|
print(" - Enter für Standard (60)")
|
|
print()
|
|
|
|
while True:
|
|
choice = input("🔴 Extreme Rate Schwellwert (Requests/Minute): ").strip()
|
|
interaction_log.append(f"🔴 Extreme Rate Schwellwert (Requests/Minute): {choice}")
|
|
|
|
if not choice:
|
|
msg = "✅ Verwende Standard: 60 Requests/Minute"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
return 60, interaction_log
|
|
|
|
try:
|
|
threshold = int(choice)
|
|
if threshold <= 0:
|
|
msg = "❌ Schwellwert muss größer als 0 sein."
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
elif threshold > 10000:
|
|
msg = "❌ Schwellwert zu hoch (max. 10000)"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
else:
|
|
reqs_per_sec = threshold / 60
|
|
msg = f"✅ Extreme Rate Schwellwert: {threshold} Requests/Minute (~{reqs_per_sec:.1f}/Sekunde)"
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
return threshold, interaction_log
|
|
except ValueError:
|
|
msg = "❌ Ungültige Eingabe. Bitte eine Zahl eingeben."
|
|
print(msg)
|
|
interaction_log.append(msg)
|
|
|
|
class LogAnalyzer:
|
|
def __init__(self, hours=48, top_n=None, domain=None, extreme_rate_threshold=60, interactive_log=None):
|
|
if not domain:
|
|
raise ValueError("Domain muss angegeben werden!")
|
|
self.domain = domain
|
|
self.log_dir = f"/var/www/vhosts/{domain}/logs"
|
|
self.hours = hours
|
|
self.top_n = top_n
|
|
self.extreme_rate_threshold = extreme_rate_threshold
|
|
self.cutoff_time = datetime.now() - timedelta(hours=hours)
|
|
self.interactive_log = interactive_log or [] # Speichert interaktive Auswahl
|
|
|
|
# Anzahl der CPU-Cores für Parallel-Verarbeitung
|
|
self.total_cores = multiprocessing.cpu_count()
|
|
self.max_workers = max(1, self.total_cores - 4) # Reserve 4 Cores für System
|
|
|
|
# Datenstrukturen für Analyse
|
|
self.all_requests = []
|
|
self.ip_cache = {}
|
|
|
|
# Erstelle Ausgabedatei
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
script_dir = Path(__file__).parent
|
|
if self.top_n:
|
|
self.output_file = script_dir / f"shop_analyse_{hours}h_top{top_n}_{timestamp}.txt"
|
|
else:
|
|
self.output_file = script_dir / f"shop_analyse_{hours}h_ALL_{timestamp}.txt"
|
|
|
|
def print_and_log(self, message="", end="\n"):
|
|
"""Gibt Text sowohl auf Console als auch in Datei aus"""
|
|
print(message, end=end)
|
|
with open(self.output_file, 'a', encoding='utf-8') as f:
|
|
f.write(message + end)
|
|
|
|
def parse_apache_timestamp(self, timestamp_str):
|
|
"""Parst Apache Log Timestamps [DD/Mon/YYYY:HH:MM:SS +ZONE]"""
|
|
try:
|
|
# Format: [10/Jan/2024:15:30:45 +0100]
|
|
match = re.match(r'\[(\d{2})/(\w{3})/(\d{4}):(\d{2}):(\d{2}):(\d{2})', timestamp_str)
|
|
if match:
|
|
day = int(match.group(1))
|
|
month = MONTH_NAMES.get(match.group(2), 1)
|
|
year = int(match.group(3))
|
|
hour = int(match.group(4))
|
|
minute = int(match.group(5))
|
|
second = int(match.group(6))
|
|
|
|
return datetime(year, month, day, hour, minute, second)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def parse_log_line(self, line):
|
|
"""Parst eine Apache/Nginx Log-Zeile"""
|
|
# Apache Combined Log Format
|
|
# IP - - [timestamp] "METHOD URL HTTP/X.X" status size "referer" "user-agent"
|
|
pattern = r'^(\S+) \S+ \S+ (\[[^\]]+\]) "([^"]*)" (\d{3}) (\S+) "([^"]*)" "([^"]*)"'
|
|
|
|
match = re.match(pattern, line)
|
|
if match:
|
|
ip = match.group(1)
|
|
timestamp_str = match.group(2)
|
|
request = match.group(3)
|
|
status = match.group(4)
|
|
size = match.group(5)
|
|
referer = match.group(6)
|
|
user_agent = match.group(7)
|
|
|
|
# Parse timestamp
|
|
timestamp = self.parse_apache_timestamp(timestamp_str)
|
|
|
|
# Parse request
|
|
request_match = re.match(r'^(\w+) (\S+)', request)
|
|
method = request_match.group(1) if request_match else ""
|
|
url = request_match.group(2) if request_match else ""
|
|
|
|
return {
|
|
'ip': ip,
|
|
'timestamp': timestamp,
|
|
'method': method,
|
|
'url': url,
|
|
'status': status,
|
|
'size': size,
|
|
'referer': referer,
|
|
'user_agent': user_agent,
|
|
'raw_line': line
|
|
}
|
|
return None
|
|
|
|
def process_log_file(self, filepath):
|
|
"""Verarbeitet eine einzelne Log-Datei"""
|
|
requests = []
|
|
|
|
try:
|
|
# Öffne Datei (gz oder normal)
|
|
if filepath.suffix == '.gz':
|
|
file_handle = gzip.open(filepath, 'rt', encoding='utf-8', errors='ignore')
|
|
else:
|
|
file_handle = open(filepath, 'r', encoding='utf-8', errors='ignore')
|
|
|
|
with file_handle:
|
|
for line in file_handle:
|
|
parsed = self.parse_log_line(line.strip())
|
|
if parsed and parsed['timestamp']:
|
|
# Prüfe ob innerhalb des Zeitfensters
|
|
if parsed['timestamp'] >= self.cutoff_time:
|
|
requests.append(parsed)
|
|
|
|
except Exception as e:
|
|
self.print_and_log(f" Fehler beim Verarbeiten von {filepath}: {e}")
|
|
|
|
return requests
|
|
|
|
def get_ip_info(self, ip):
|
|
"""Holt detaillierte IP-Informationen via whois"""
|
|
# Check Cache
|
|
if ip in self.ip_cache:
|
|
return self.ip_cache[ip]
|
|
|
|
try:
|
|
# Versuche GeoIP
|
|
country = "??"
|
|
try:
|
|
result = subprocess.run(['geoiplookup', ip],
|
|
capture_output=True, text=True, timeout=2)
|
|
if result.returncode == 0 and 'GeoIP Country Edition:' in result.stdout:
|
|
country = result.stdout.split('GeoIP Country Edition:')[1].split(',')[0].strip()
|
|
except:
|
|
pass
|
|
|
|
# Versuche whois via cymru
|
|
asn = "?"
|
|
as_name = "Unknown"
|
|
try:
|
|
result = subprocess.run(['whois', '-h', 'whois.cymru.com', f' -v {ip}'],
|
|
capture_output=True, text=True, timeout=3)
|
|
if result.returncode == 0:
|
|
lines = result.stdout.strip().split('\n')
|
|
if lines:
|
|
parts = lines[-1].split()
|
|
if len(parts) > 5:
|
|
asn = parts[0]
|
|
as_name = ' '.join(parts[5:])[:80]
|
|
except:
|
|
pass
|
|
|
|
# Fallback auf reguläres whois
|
|
if asn == "?" or asn == "NA":
|
|
try:
|
|
result = subprocess.run(['whois', ip],
|
|
capture_output=True, text=True, timeout=5)
|
|
if result.returncode == 0:
|
|
whois_text = result.stdout
|
|
|
|
# Extrahiere ASN
|
|
asn_match = re.search(r'(?:aut-num|origin|originas):\s*(?:AS)?(\d+)',
|
|
whois_text, re.IGNORECASE)
|
|
if asn_match:
|
|
asn = asn_match.group(1)
|
|
|
|
# Extrahiere Organisation
|
|
org_patterns = [
|
|
r'(?:org-name|orgname|organisation|organization):\s*(.+)',
|
|
r'descr:\s*(.+)',
|
|
r'netname:\s*(.+)'
|
|
]
|
|
|
|
for pattern in org_patterns:
|
|
org_match = re.search(pattern, whois_text, re.IGNORECASE)
|
|
if org_match:
|
|
as_name = org_match.group(1).strip()[:80]
|
|
break
|
|
except:
|
|
pass
|
|
|
|
info = f"{country} | AS{asn} {as_name}"
|
|
self.ip_cache[ip] = info
|
|
return info
|
|
|
|
except Exception as e:
|
|
info = f"?? | Lookup failed"
|
|
self.ip_cache[ip] = info
|
|
return info
|
|
|
|
def get_ip_info_batch(self, ips):
|
|
"""Holt IP-Informationen für mehrere IPs parallel"""
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
|
futures = {executor.submit(self.get_ip_info, ip): ip for ip in ips}
|
|
results = {}
|
|
for future in as_completed(futures):
|
|
ip = futures[future]
|
|
try:
|
|
results[ip] = future.result()
|
|
except Exception as e:
|
|
results[ip] = "?? | Lookup failed"
|
|
return results
|
|
|
|
def categorize_ip(self, ip_info):
|
|
"""Kategorisiert eine IP basierend auf whois-Informationen"""
|
|
info_lower = ip_info.lower()
|
|
|
|
if any(x in info_lower for x in ['amazon', 'aws', 'ec2', 'azure', 'google cloud',
|
|
'gcp', 'digitalocean', 'linode', 'vultr', 'ovh cloud']):
|
|
return 'cloud'
|
|
elif any(x in info_lower for x in ['datacenter', 'data center', 'hosting',
|
|
'server', 'colocation', 'colo']):
|
|
return 'datacenter'
|
|
elif any(x in info_lower for x in ['hetzner', 'contabo', 'netcup', 'strato',
|
|
'1und1', 'ionos']):
|
|
return 'hosting'
|
|
elif any(x in info_lower for x in ['vpn', 'proxy', 'private relay']):
|
|
return 'vpn'
|
|
elif any(x in info_lower for x in ['tor', 'tor-exit', 'anonymizer']):
|
|
return 'tor'
|
|
elif any(x in info_lower for x in ['telecom', 'telekom', 'vodafone', 'o2',
|
|
'kabel', 'broadband', 'dsl', 'cable',
|
|
'fiber', 'residential']):
|
|
return 'residential'
|
|
else:
|
|
return 'isp'
|
|
|
|
def is_bot_user_agent(self, user_agent):
|
|
"""Prüft ob User-Agent ein Bot ist"""
|
|
ua_lower = user_agent.lower()
|
|
bot_patterns = [
|
|
'bot', 'crawler', 'spider', 'scraper', 'curl', 'wget', 'python',
|
|
'go-http', 'java', 'apache', 'scrapy', 'requests', 'aiohttp',
|
|
'axios', 'node-fetch', 'http.rb', 'libwww', 'semrush', 'ahrefs',
|
|
'mj12bot', 'dotbot', 'yandex', 'baidu', 'bingbot', 'googlebot',
|
|
'duckduck', 'slurp', 'facebot', 'whatsapp', 'telegram', 'discord', 'slack'
|
|
]
|
|
return any(pattern in ua_lower for pattern in bot_patterns)
|
|
|
|
def analyze(self):
|
|
"""Hauptanalyse-Funktion"""
|
|
# Schreibe interaktive Session-Log falls vorhanden
|
|
if self.interactive_log:
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log("INTERAKTIVE SESSION-PROTOKOLL")
|
|
self.print_and_log("=" * 67)
|
|
for line in self.interactive_log:
|
|
self.print_and_log(line)
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log()
|
|
|
|
self.print_and_log("=" * 67)
|
|
if self.top_n:
|
|
self.print_and_log(f"Request-Analyse für {self.domain} (letzte {self.hours} Stunden, Top {self.top_n})")
|
|
else:
|
|
self.print_and_log(f"Request-Analyse für {self.domain} (letzte {self.hours} Stunden, ALLE Ergebnisse)")
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log()
|
|
|
|
self.print_and_log(f"Analysiere Zeitraum: {self.cutoff_time.strftime('%d.%m.%Y %H:%M')} bis {datetime.now().strftime('%d.%m.%Y %H:%M')}")
|
|
self.print_and_log()
|
|
|
|
# 1. Lade alle Log-Dateien
|
|
self.print_and_log("Extrahiere Daten aus Logs...")
|
|
self.print_and_log("-" * 67)
|
|
|
|
log_files = []
|
|
log_dir = Path(self.log_dir)
|
|
|
|
# Normale Logs
|
|
for pattern in ['access_log', 'access_ssl_log', 'proxy_access_log', 'proxy_access_ssl_log']:
|
|
if (log_dir / pattern).exists():
|
|
log_files.append(log_dir / pattern)
|
|
|
|
# Gezippte Logs
|
|
for pattern in ['access_log.processed*.gz', 'access_ssl_log.processed*.gz']:
|
|
log_files.extend(log_dir.glob(pattern))
|
|
|
|
# Verarbeite Logs parallel
|
|
all_requests = []
|
|
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
|
|
futures = {executor.submit(self.process_log_file, f): f for f in log_files}
|
|
for future in as_completed(futures):
|
|
filepath = futures[future]
|
|
try:
|
|
requests = future.result()
|
|
count = len(requests)
|
|
all_requests.extend(requests)
|
|
self.print_and_log(f" Verarbeite {filepath.name}... {count} Einträge")
|
|
except Exception as e:
|
|
self.print_and_log(f" Fehler bei {filepath.name}: {e}")
|
|
|
|
self.all_requests = all_requests
|
|
total = len(self.all_requests)
|
|
|
|
self.print_and_log()
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log("1. GESAMTZAHL DER REQUESTS")
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log()
|
|
self.print_and_log(f"GESAMT: {total} Requests in den letzten {self.hours} Stunden")
|
|
self.print_and_log(f"Durchschnitt: {total // self.hours} Requests/Stunde")
|
|
self.print_and_log()
|
|
|
|
# 2. Top URLs
|
|
self.analyze_urls()
|
|
|
|
# 3. Top IPs
|
|
self.analyze_ips()
|
|
|
|
# 4. User-Agents
|
|
self.analyze_user_agents()
|
|
|
|
# 5. Status Codes
|
|
self.analyze_status_codes()
|
|
|
|
# 6. Request Methods
|
|
self.analyze_methods()
|
|
|
|
# 7. Bots
|
|
self.analyze_bots()
|
|
|
|
# 8. Zeitliche Verteilung
|
|
self.analyze_hourly_distribution()
|
|
|
|
# 9. Verdächtige Aktivitäten
|
|
self.analyze_suspicious_activity()
|
|
|
|
# 10. 404-Fehler IPs
|
|
self.analyze_404_ips()
|
|
|
|
# 11. IP-Kategorisierung
|
|
self.analyze_ip_categories()
|
|
|
|
# 12. Request-Rate-Analyse
|
|
self.analyze_request_rates()
|
|
|
|
# 13. Bot-Pattern-Analyse
|
|
self.analyze_bot_patterns()
|
|
|
|
self.print_and_log()
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log("Analyse abgeschlossen")
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log()
|
|
self.print_and_log(f"Ausgabe wurde gespeichert in:")
|
|
self.print_and_log(f" {self.output_file}")
|
|
|
|
def analyze_urls(self):
|
|
"""Analysiert die häufigsten URLs"""
|
|
self.print_and_log("-" * 67)
|
|
if self.top_n:
|
|
self.print_and_log(f"2. TOP {self.top_n} ANGEFRAGTE URLs/PFADE")
|
|
else:
|
|
self.print_and_log("2. ALLE ANGEFRAGTEN URLs/PFADE (sortiert nach Häufigkeit)")
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log()
|
|
|
|
# Zähle URLs und deren Top-IPs
|
|
url_counts = Counter()
|
|
url_ips = defaultdict(Counter)
|
|
|
|
for req in self.all_requests:
|
|
url = req['url']
|
|
ip = req['ip']
|
|
url_counts[url] += 1
|
|
url_ips[url][ip] += 1
|
|
|
|
# Sortiere und limitiere
|
|
top_urls = url_counts.most_common(self.top_n)
|
|
|
|
# Hole IP-Infos für Top-IPs
|
|
all_top_ips = set()
|
|
for url, _ in top_urls[:50]: # Nur für die ersten 50 URLs
|
|
if url in url_ips:
|
|
top_ip = url_ips[url].most_common(1)[0][0]
|
|
all_top_ips.add(top_ip)
|
|
|
|
ip_infos = self.get_ip_info_batch(list(all_top_ips))
|
|
|
|
# Zeige Ergebnisse
|
|
for url, count in top_urls:
|
|
if url in url_ips:
|
|
top_ip, top_count = url_ips[url].most_common(1)[0]
|
|
ip_info = ip_infos.get(top_ip, self.get_ip_info(top_ip))
|
|
self.print_and_log(f"{count:10d} {url[:80]:<80} (Top: {top_ip} x{top_count} - {ip_info})")
|
|
else:
|
|
self.print_and_log(f"{count:10d} {url}")
|
|
self.print_and_log()
|
|
|
|
def analyze_ips(self):
|
|
"""Analysiert die häufigsten IPs"""
|
|
self.print_and_log("-" * 67)
|
|
if self.top_n:
|
|
self.print_and_log(f"3. TOP {self.top_n} IP-ADRESSEN (potenzielle Bots)")
|
|
else:
|
|
self.print_and_log("3. ALLE IP-ADRESSEN (sortiert nach Häufigkeit)")
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log()
|
|
|
|
# Zähle IPs
|
|
ip_counts = Counter(req['ip'] for req in self.all_requests)
|
|
top_ips = ip_counts.most_common(self.top_n)
|
|
|
|
# Hole IP-Infos parallel
|
|
self.print_and_log(f"Sammle IP-Informationen für {len(top_ips)} IPs...")
|
|
self.print_and_log(f"(Parallel-Modus mit {self.max_workers} Cores)")
|
|
self.print_and_log()
|
|
|
|
ip_list = [ip for ip, _ in top_ips]
|
|
ip_infos = self.get_ip_info_batch(ip_list)
|
|
|
|
# Zeige Ergebnisse
|
|
for ip, count in top_ips:
|
|
info = ip_infos.get(ip, "Lookup fehlgeschlagen")
|
|
self.print_and_log(f"{count:10d} {ip:<15} ({info})")
|
|
self.print_and_log()
|
|
|
|
def analyze_user_agents(self):
|
|
"""Analysiert User-Agents"""
|
|
self.print_and_log("-" * 67)
|
|
if self.top_n:
|
|
self.print_and_log(f"4. USER-AGENTS (Top {self.top_n})")
|
|
else:
|
|
self.print_and_log("4. ALLE USER-AGENTS (sortiert nach Häufigkeit)")
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log()
|
|
|
|
# Zähle User-Agents und deren Top-IPs
|
|
ua_counts = Counter()
|
|
ua_ips = defaultdict(Counter)
|
|
|
|
for req in self.all_requests:
|
|
ua = req['user_agent']
|
|
ip = req['ip']
|
|
ua_counts[ua] += 1
|
|
ua_ips[ua][ip] += 1
|
|
|
|
top_uas = ua_counts.most_common(self.top_n)
|
|
|
|
# Hole IP-Infos für Top-IPs
|
|
all_top_ips = set()
|
|
for ua, _ in top_uas[:30]: # Nur für die ersten 30 User-Agents
|
|
if ua in ua_ips:
|
|
top_ip = ua_ips[ua].most_common(1)[0][0]
|
|
all_top_ips.add(top_ip)
|
|
|
|
ip_infos = self.get_ip_info_batch(list(all_top_ips))
|
|
|
|
# Zeige Ergebnisse
|
|
for ua, count in top_uas:
|
|
ua_display = ua[:100] + "..." if len(ua) > 100 else ua
|
|
self.print_and_log(f"{count:10d} {ua_display}")
|
|
|
|
if ua in ua_ips:
|
|
top_ip, top_count = ua_ips[ua].most_common(1)[0]
|
|
ip_info = ip_infos.get(top_ip, self.get_ip_info(top_ip))
|
|
self.print_and_log(f" (Top-IP: {top_ip} x{top_count} - {ip_info})")
|
|
self.print_and_log()
|
|
|
|
def analyze_status_codes(self):
|
|
"""Analysiert HTTP Status Codes"""
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log("5. HTTP-STATUS-CODES")
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log()
|
|
|
|
status_counts = Counter(req['status'] for req in self.all_requests)
|
|
for status, count in status_counts.most_common():
|
|
self.print_and_log(f"{count:10d} HTTP {status}")
|
|
self.print_and_log()
|
|
|
|
def analyze_methods(self):
|
|
"""Analysiert Request-Methoden"""
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log("6. REQUESTS NACH METHODE")
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log()
|
|
|
|
method_counts = Counter(req['method'] for req in self.all_requests if req['method'])
|
|
for method, count in method_counts.most_common():
|
|
self.print_and_log(f"{count:10d} {method}")
|
|
self.print_and_log()
|
|
|
|
def analyze_bots(self):
|
|
"""Analysiert Bot-Traffic"""
|
|
self.print_and_log("-" * 67)
|
|
if self.top_n:
|
|
self.print_and_log(f"7. TOP {self.top_n} BOTS (identifiziert via User-Agent)")
|
|
else:
|
|
self.print_and_log("7. ALLE BOTS (identifiziert via User-Agent)")
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log()
|
|
|
|
bot_counts = Counter()
|
|
for req in self.all_requests:
|
|
if self.is_bot_user_agent(req['user_agent']):
|
|
bot_counts[req['user_agent']] += 1
|
|
|
|
top_bots = bot_counts.most_common(self.top_n)
|
|
for bot, count in top_bots:
|
|
bot_display = bot[:80] + "..." if len(bot) > 80 else bot
|
|
self.print_and_log(f"{count:10d} {bot_display}")
|
|
self.print_and_log()
|
|
|
|
def analyze_hourly_distribution(self):
|
|
"""Analysiert zeitliche Verteilung"""
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log("8. REQUESTS PRO STUNDE (zeitliche Verteilung)")
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log()
|
|
|
|
hourly_counts = Counter()
|
|
for req in self.all_requests:
|
|
if req['timestamp']:
|
|
hour_key = req['timestamp'].strftime("%Y-%m-%d %H:00")
|
|
hourly_counts[hour_key] += 1
|
|
|
|
# Zeige die letzten 48 Stunden
|
|
for hour, count in sorted(hourly_counts.items())[-48:]:
|
|
self.print_and_log(f"{hour} {count:10d} Requests")
|
|
self.print_and_log()
|
|
|
|
def analyze_suspicious_activity(self):
|
|
"""Analysiert verdächtige Aktivitäten"""
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log("9. VERDÄCHTIGE AKTIVITÄTEN")
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log()
|
|
|
|
# 404-Fehler URLs
|
|
self.print_and_log("404-Fehler (häufigste nicht existierende Pfade):")
|
|
error_404_urls = Counter()
|
|
for req in self.all_requests:
|
|
if req['status'] == '404':
|
|
error_404_urls[req['url']] += 1
|
|
|
|
top_404s = error_404_urls.most_common(self.top_n if self.top_n else 20)
|
|
for url, count in top_404s:
|
|
self.print_and_log(f"{count:10d} {url}")
|
|
|
|
# POST-Requests
|
|
self.print_and_log()
|
|
self.print_and_log("Häufige POST-Requests mit IPs (potenzielle Brute-Force):")
|
|
|
|
post_ips = defaultdict(Counter)
|
|
for req in self.all_requests:
|
|
if req['method'] == 'POST':
|
|
post_ips[req['url']][req['ip']] += 1
|
|
|
|
# Aggregiere POST-Requests
|
|
post_counts = []
|
|
for url, ip_counter in post_ips.items():
|
|
for ip, count in ip_counter.items():
|
|
post_counts.append((count, ip, url))
|
|
|
|
post_counts.sort(reverse=True)
|
|
top_posts = post_counts[:self.top_n] if self.top_n else post_counts[:20]
|
|
|
|
if top_posts:
|
|
# Hole IP-Infos
|
|
post_ips_list = list(set(ip for _, ip, _ in top_posts))
|
|
ip_infos = self.get_ip_info_batch(post_ips_list)
|
|
|
|
for count, ip, url in top_posts:
|
|
info = ip_infos.get(ip, "Lookup fehlgeschlagen")
|
|
self.print_and_log(f"{count:10d} {ip:<15} → {url}")
|
|
self.print_and_log(f" ({info})")
|
|
self.print_and_log()
|
|
|
|
def analyze_404_ips(self):
|
|
"""Analysiert IPs mit vielen 404-Fehlern"""
|
|
self.print_and_log("-" * 67)
|
|
if self.top_n:
|
|
self.print_and_log(f"10. TOP {self.top_n} IP-ADRESSEN MIT MEISTEN 404-FEHLERN")
|
|
else:
|
|
self.print_and_log("10. ALLE IP-ADRESSEN MIT 404-FEHLERN (sortiert nach Häufigkeit)")
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log()
|
|
|
|
error_404_ips = Counter()
|
|
for req in self.all_requests:
|
|
if req['status'] == '404':
|
|
error_404_ips[req['ip']] += 1
|
|
|
|
top_404_ips = error_404_ips.most_common(self.top_n)
|
|
|
|
if top_404_ips:
|
|
# Hole IP-Infos
|
|
ip_list = [ip for ip, _ in top_404_ips]
|
|
ip_infos = self.get_ip_info_batch(ip_list)
|
|
|
|
for ip, count in top_404_ips:
|
|
info = ip_infos.get(ip, "Lookup fehlgeschlagen")
|
|
self.print_and_log(f"{count:10d} {ip:<15} ({info})")
|
|
self.print_and_log()
|
|
|
|
def analyze_ip_categories(self):
|
|
"""Kategorisiert IPs nach Typ"""
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log("11. IP-KATEGORISIERUNG NACH TYP (Top 20 je Kategorie)")
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log()
|
|
|
|
self.print_and_log("Analysiere IP-Typen...")
|
|
|
|
# Sammle alle unique IPs
|
|
ip_counts = Counter(req['ip'] for req in self.all_requests)
|
|
unique_ips = list(ip_counts.keys())
|
|
|
|
self.print_and_log(f"Führe Parallel-Lookups für {len(unique_ips)} unique IPs durch (mit {self.max_workers} Cores)...")
|
|
|
|
# Hole IP-Infos
|
|
ip_infos = self.get_ip_info_batch(unique_ips)
|
|
|
|
# Kategorisiere IPs
|
|
categories = defaultdict(list)
|
|
for ip, count in ip_counts.items():
|
|
info = ip_infos.get(ip, "Unknown")
|
|
category = self.categorize_ip(info)
|
|
categories[category].append((count, ip, info))
|
|
|
|
# Zeige Kategorien
|
|
category_names = {
|
|
'cloud': 'CLOUD-PROVIDER (AWS, Azure, GCP, etc.)',
|
|
'datacenter': 'RECHENZENTREN / DATACENTER',
|
|
'hosting': 'HOSTING-PROVIDER',
|
|
'vpn': 'VPN / PROXY-DIENSTE',
|
|
'tor': 'TOR-NETZWERK',
|
|
'residential': 'PRIVAT-NUTZER / ISP (Telekom, Vodafone, etc.)',
|
|
'isp': 'SONSTIGE ISP'
|
|
}
|
|
|
|
for cat_key, cat_name in category_names.items():
|
|
self.print_and_log()
|
|
self.print_and_log(f"{cat_name}:")
|
|
|
|
if cat_key in categories:
|
|
sorted_ips = sorted(categories[cat_key], reverse=True)[:20]
|
|
for count, ip, info in sorted_ips:
|
|
self.print_and_log(f"{count:10d} {ip:<15} ({info})")
|
|
else:
|
|
self.print_and_log(" Keine gefunden")
|
|
self.print_and_log()
|
|
|
|
def analyze_request_rates(self):
|
|
"""Analysiert Request-Raten um Burst-Patterns zu erkennen"""
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log("12. REQUEST-RATE-ANALYSE (für Rate-Limiting)")
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log()
|
|
self.print_and_log("Analysiere Request-Raten pro IP (Requests/Minute)...")
|
|
self.print_and_log("Hilft bei der Entscheidung für angemessene Rate-Limits")
|
|
self.print_and_log()
|
|
|
|
# Sammle IPs mit mindestens 100 Requests
|
|
ip_counts = Counter(req['ip'] for req in self.all_requests)
|
|
relevant_ips = [(count, ip) for ip, count in ip_counts.items() if count >= 100]
|
|
relevant_ips.sort(reverse=True)
|
|
|
|
# Analysiere Top-IPs
|
|
rate_analysis = []
|
|
for count, ip in relevant_ips[:50]: # Top 50 IPs analysieren
|
|
max_rate, avg_rate, burst_count = self.calculate_request_rate(ip)
|
|
if max_rate > 0:
|
|
rate_analysis.append((max_rate, avg_rate, count, ip, burst_count))
|
|
|
|
# Sortiere nach maximaler Rate
|
|
rate_analysis.sort(reverse=True)
|
|
|
|
if rate_analysis:
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log("TOP IPS NACH MAXIMALER REQUEST-RATE")
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log()
|
|
self.print_and_log(" IP | Max/Min | Avg/Min | Total | Bursts | Info")
|
|
self.print_and_log(" " + "-" * 63)
|
|
|
|
# Hole IP-Infos
|
|
ip_list = [ip for _, _, _, ip, _ in rate_analysis[:20]]
|
|
ip_infos = self.get_ip_info_batch(ip_list)
|
|
|
|
for max_rate, avg_rate, total, ip, burst_count in rate_analysis[:20]:
|
|
info = ip_infos.get(ip, "Unknown")
|
|
info_short = info[:40] + "..." if len(info) > 40 else info
|
|
|
|
# Warnung-Symbole basierend auf Rate
|
|
warning = ""
|
|
if max_rate >= self.extreme_rate_threshold:
|
|
warning = "🔴" # Extrem hoch (über konfiguriertem Schwellwert)
|
|
elif max_rate >= self.extreme_rate_threshold * 0.5:
|
|
warning = "🟡" # Hoch (50% des Schwellwerts)
|
|
elif max_rate >= self.extreme_rate_threshold * 0.25:
|
|
warning = "⚠️" # Erhöht (25% des Schwellwerts)
|
|
|
|
self.print_and_log(f" {warning:<2} {ip:<15} | {max_rate:7d} | {avg_rate:7.1f} | {total:5d} | {burst_count:6d} | {info_short}")
|
|
|
|
# Statistiken
|
|
self.print_and_log()
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log("RATE-LIMITING EMPFEHLUNGEN")
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log()
|
|
|
|
# Berechne Perzentile
|
|
all_max_rates = [r[0] for r in rate_analysis]
|
|
if all_max_rates:
|
|
percentile_50 = sorted(all_max_rates)[len(all_max_rates)//2]
|
|
percentile_90 = sorted(all_max_rates)[int(len(all_max_rates)*0.9) if int(len(all_max_rates)*0.9) > 0 else 0]
|
|
percentile_99 = sorted(all_max_rates)[int(len(all_max_rates)*0.99) if int(len(all_max_rates)*0.99) > 0 else -1]
|
|
|
|
self.print_and_log("📊 Request-Rate Verteilung:")
|
|
self.print_and_log(f" 50% der IPs: <= {percentile_50} Requests/Minute")
|
|
self.print_and_log(f" 90% der IPs: <= {percentile_90} Requests/Minute")
|
|
self.print_and_log(f" 99% der IPs: <= {percentile_99} Requests/Minute")
|
|
self.print_and_log()
|
|
|
|
# Empfehlungen
|
|
self.print_and_log("💡 Empfohlene Rate-Limits basierend auf Analyse:")
|
|
self.print_and_log()
|
|
self.print_and_log(f" Konfigurierter Extreme-Schwellwert: {self.extreme_rate_threshold} Requests/Minute")
|
|
self.print_and_log()
|
|
|
|
if percentile_90 < 10:
|
|
self.print_and_log(" ✅ NORMAL TRAFFIC: Die meisten IPs haben niedrige Raten")
|
|
self.print_and_log(" - Standard-Limit: 20-30 Requests/Minute")
|
|
self.print_and_log(" - Burst-Limit: 5-10 Requests/10 Sekunden")
|
|
elif percentile_90 < 30:
|
|
self.print_and_log(" ⚠️ MODERATE TRAFFIC: Einige IPs zeigen erhöhte Aktivität")
|
|
self.print_and_log(" - Standard-Limit: 30-60 Requests/Minute")
|
|
self.print_and_log(" - Burst-Limit: 10-15 Requests/10 Sekunden")
|
|
else:
|
|
self.print_and_log(" 🔴 HIGH TRAFFIC: Viele IPs mit hohen Request-Raten")
|
|
self.print_and_log(" - Standard-Limit: 60-120 Requests/Minute")
|
|
self.print_and_log(" - Burst-Limit: 20-30 Requests/10 Sekunden")
|
|
|
|
self.print_and_log()
|
|
self.print_and_log(" Zusätzliche Überlegungen:")
|
|
self.print_and_log(" - Residential IPs: Großzügigere Limits")
|
|
self.print_and_log(" - Cloud/Datacenter: Strengere Limits")
|
|
self.print_and_log(" - Bekannte Bots: Sehr strenge Limits oder Block")
|
|
|
|
# Zeige IPs die definitiv geblockt werden sollten
|
|
extreme_ips = [ip for rate, _, _, ip, _ in rate_analysis if rate > self.extreme_rate_threshold]
|
|
if extreme_ips:
|
|
self.print_and_log()
|
|
self.print_and_log(f" 🔴 IPs mit extremen Raten (>{self.extreme_rate_threshold}/min) - SOFORT BLOCKEN:")
|
|
for ip in extreme_ips[:10]:
|
|
info = ip_infos.get(ip, "Unknown")
|
|
self.print_and_log(f" - {ip}: {info[:50]}")
|
|
else:
|
|
self.print_and_log(" Keine IPs mit genügend Daten für Rate-Analyse gefunden")
|
|
|
|
self.print_and_log()
|
|
|
|
|
|
def calculate_request_rate(self, ip):
|
|
"""Berechnet die maximale Request-Rate einer IP (Requests pro Minute)"""
|
|
# Sammle alle Timestamps für diese IP
|
|
timestamps = []
|
|
for req in self.all_requests:
|
|
if req['ip'] == ip and req['timestamp']:
|
|
timestamps.append(req['timestamp'])
|
|
|
|
if len(timestamps) < 2:
|
|
return 0, 0, 0 # max_rate, avg_rate, burst_count
|
|
|
|
timestamps.sort()
|
|
|
|
# Analysiere Requests in 60-Sekunden-Fenstern
|
|
max_requests_per_minute = 0
|
|
total_minutes = 0
|
|
burst_count = 0 # Anzahl der Minuten mit > 10 Requests
|
|
|
|
# Sliding window von 60 Sekunden
|
|
for i in range(len(timestamps)):
|
|
window_end = timestamps[i] + timedelta(seconds=60)
|
|
requests_in_window = 0
|
|
|
|
for j in range(i, len(timestamps)):
|
|
if timestamps[j] <= window_end:
|
|
requests_in_window += 1
|
|
else:
|
|
break
|
|
|
|
if requests_in_window > max_requests_per_minute:
|
|
max_requests_per_minute = requests_in_window
|
|
|
|
if requests_in_window > 10:
|
|
burst_count += 1
|
|
|
|
# Durchschnittliche Rate
|
|
total_duration = (timestamps[-1] - timestamps[0]).total_seconds()
|
|
if total_duration > 0:
|
|
avg_rate = (len(timestamps) * 60) / total_duration
|
|
else:
|
|
avg_rate = 0
|
|
|
|
return max_requests_per_minute, avg_rate, burst_count
|
|
|
|
def get_top_urls_for_ip(self, ip, limit=3):
|
|
"""Holt die Top-URLs für eine spezifische IP"""
|
|
url_counts = Counter()
|
|
for req in self.all_requests:
|
|
if req['ip'] == ip:
|
|
url_counts[req['url']] += 1
|
|
return url_counts.most_common(limit)
|
|
|
|
def analyze_bot_patterns(self):
|
|
"""Erweiterte Bot-Pattern-Analyse"""
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log("13. BOT-PATTERN-ANALYSE & ENTSCHEIDUNGSHILFE")
|
|
self.print_and_log("-" * 67)
|
|
self.print_and_log()
|
|
|
|
if len(self.all_requests) < 10:
|
|
self.print_and_log("⚠️ WARNUNG: Zu wenig Daten für erweiterte Analyse (< 10 Requests)")
|
|
self.print_and_log(" Überspringe Bot-Pattern-Analyse...")
|
|
return
|
|
|
|
self.print_and_log("Analysiere Bot-Verhaltensmuster für fundierte Block-Entscheidungen...")
|
|
self.print_and_log()
|
|
|
|
# 1. IPs mit hoher Request-Rate aber wenig URL-Varianz
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log("VERDÄCHTIGE IPs: Hohe Request-Rate + geringe URL-Vielfalt")
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log("(Echte User besuchen viele verschiedene Seiten, Bots oft nur wenige)")
|
|
self.print_and_log()
|
|
|
|
ip_stats = defaultdict(lambda: {'count': 0, 'urls': set()})
|
|
for req in self.all_requests:
|
|
ip_stats[req['ip']]['count'] += 1
|
|
ip_stats[req['ip']]['urls'].add(req['url'])
|
|
|
|
low_variety_ips = []
|
|
for ip, stats in ip_stats.items():
|
|
if stats['count'] > 100:
|
|
unique_urls = len(stats['urls'])
|
|
ratio = (unique_urls * 100) // stats['count']
|
|
if ratio < 5:
|
|
low_variety_ips.append((stats['count'], ip, unique_urls, ratio, stats['urls']))
|
|
|
|
low_variety_ips.sort(reverse=True)
|
|
|
|
if low_variety_ips:
|
|
# Hole IP-Infos
|
|
ip_list = [ip for _, ip, _, _, _ in low_variety_ips[:20]]
|
|
ip_infos = self.get_ip_info_batch(ip_list)
|
|
|
|
for count, ip, unique_urls, ratio, urls in low_variety_ips[:20]:
|
|
info = ip_infos.get(ip, "Unknown")
|
|
self.print_and_log(f"⚠️ {ip}: {count} requests, nur {unique_urls} unique URLs ({ratio}% Vielfalt)")
|
|
self.print_and_log(f" {info}")
|
|
self.print_and_log(" Top-URLs:")
|
|
|
|
# Zeige Top-3 URLs
|
|
url_counts = Counter(req['url'] for req in self.all_requests if req['ip'] == ip)
|
|
for url, url_count in url_counts.most_common(3):
|
|
self.print_and_log(f" {url_count:6d} x {url}")
|
|
self.print_and_log()
|
|
else:
|
|
self.print_and_log(" Keine verdächtigen IPs gefunden")
|
|
self.print_and_log()
|
|
|
|
# 2. User-Agent + IP Kombinationen
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log("USER-AGENT + IP KOMBINATIONEN (Bot-Fingerprinting)")
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log("(Gleicher User-Agent von vielen IPs = verteilter Bot)")
|
|
self.print_and_log()
|
|
|
|
ua_stats = defaultdict(lambda: {'count': 0, 'ips': set()})
|
|
for req in self.all_requests:
|
|
ua_stats[req['user_agent']]['count'] += 1
|
|
ua_stats[req['user_agent']]['ips'].add(req['ip'])
|
|
|
|
distributed_bots = []
|
|
for ua, stats in ua_stats.items():
|
|
unique_ips = len(stats['ips'])
|
|
if unique_ips > 20 and stats['count'] > 1000:
|
|
distributed_bots.append((stats['count'], unique_ips, ua, stats['ips']))
|
|
|
|
distributed_bots.sort(reverse=True)
|
|
|
|
if distributed_bots:
|
|
for total_requests, unique_ips, ua, ips in distributed_bots[:30]:
|
|
ua_short = ua[:80] + "..." if len(ua) > 80 else ua
|
|
avg_per_ip = total_requests / unique_ips
|
|
|
|
self.print_and_log(f"🤖 {ua_short}")
|
|
self.print_and_log(f" {total_requests} requests von {unique_ips} verschiedenen IPs (Ø {avg_per_ip:.1f} req/IP)")
|
|
|
|
# Zeige Top-3 IPs
|
|
self.print_and_log(" Top-IPs:")
|
|
ip_counts = Counter(req['ip'] for req in self.all_requests if req['user_agent'] == ua)
|
|
top_ips = ip_counts.most_common(3)
|
|
|
|
if top_ips:
|
|
ip_list = [ip for ip, _ in top_ips]
|
|
ip_infos = self.get_ip_info_batch(ip_list)
|
|
|
|
for ip_addr, ip_count in top_ips:
|
|
ip_info = ip_infos.get(ip_addr, "Unknown")[:60]
|
|
self.print_and_log(f" {ip_count:6d} x {ip_addr} ({ip_info})")
|
|
self.print_and_log()
|
|
else:
|
|
self.print_and_log(" Keine verteilten Bots gefunden")
|
|
self.print_and_log()
|
|
|
|
# 3. Scanner-Aktivität (404-Pattern)
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log("SCANNER-AKTIVITÄT: 404-Fehler Pattern")
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log("(Viele 404-Fehler = Scanner suchen nach Schwachstellen)")
|
|
self.print_and_log()
|
|
|
|
ip_404_stats = defaultdict(lambda: {'total': 0, 'errors': 0})
|
|
for req in self.all_requests:
|
|
ip_404_stats[req['ip']]['total'] += 1
|
|
if req['status'] == '404':
|
|
ip_404_stats[req['ip']]['errors'] += 1
|
|
|
|
high_404_ips = []
|
|
for ip, stats in ip_404_stats.items():
|
|
if stats['total'] > 50 and stats['errors'] > 0:
|
|
error_rate = (stats['errors'] * 100) // stats['total']
|
|
if error_rate > 30:
|
|
high_404_ips.append((stats['total'], stats['errors'], error_rate, ip))
|
|
|
|
high_404_ips.sort(key=lambda x: x[2], reverse=True)
|
|
|
|
if high_404_ips:
|
|
# Hole IP-Infos
|
|
ip_list = [ip for _, _, _, ip in high_404_ips[:15]]
|
|
ip_infos = self.get_ip_info_batch(ip_list)
|
|
|
|
for total, errors, error_rate, ip in high_404_ips[:15]:
|
|
info = ip_infos.get(ip, "Unknown")
|
|
self.print_and_log(f"🔍 {ip}: {errors}/{total} requests sind 404-Fehler ({error_rate}%)")
|
|
self.print_and_log(f" {info}")
|
|
|
|
# Zeige Top-3 404-URLs
|
|
self.print_and_log(" Gesuchte Pfade:")
|
|
error_urls = Counter(req['url'] for req in self.all_requests
|
|
if req['ip'] == ip and req['status'] == '404')
|
|
for url, count in error_urls.most_common(3):
|
|
self.print_and_log(f" {count:6d} x {url}")
|
|
self.print_and_log()
|
|
else:
|
|
self.print_and_log(" Keine IPs mit hoher 404-Rate gefunden")
|
|
self.print_and_log()
|
|
|
|
# 4. Empfohlene Block-Liste
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log("EMPFOHLENE BLOCK-LISTE (basierend auf Analyse)")
|
|
self.print_and_log("=" * 67)
|
|
self.print_and_log()
|
|
self.print_and_log("📋 IPs zum sofortigen Blockieren (hohe Konfidenz):")
|
|
self.print_and_log(" (Datacenter + hohe Request-Rate + geringe Vielfalt ODER hohe 404-Rate)")
|
|
self.print_and_log()
|
|
|
|
# Kombiniere Kandidaten
|
|
block_candidates = []
|
|
|
|
# Low variety IPs
|
|
for count, ip, unique_urls, ratio, _ in low_variety_ips[:30]:
|
|
block_candidates.append((count, ip, 'low_variety', ratio))
|
|
|
|
# High 404 IPs
|
|
for total, errors, error_rate, ip in high_404_ips[:30]:
|
|
block_candidates.append((total, ip, 'high_404', error_rate))
|
|
|
|
# Dedupliziere und sortiere
|
|
seen_ips = set()
|
|
final_candidates = []
|
|
for count, ip, reason, metric in sorted(block_candidates, reverse=True):
|
|
if ip not in seen_ips:
|
|
seen_ips.add(ip)
|
|
final_candidates.append((count, ip, reason, metric))
|
|
|
|
if final_candidates:
|
|
# Hole IP-Infos
|
|
ip_list = [ip for _, ip, _, _ in final_candidates[:30]]
|
|
ip_infos = self.get_ip_info_batch(ip_list)
|
|
|
|
for count, ip, reason, metric in final_candidates[:30]:
|
|
info = ip_infos.get(ip, "Unknown")
|
|
|
|
# Prüfe ob Datacenter/Cloud
|
|
ip_type = ""
|
|
if any(x in info.lower() for x in ['amazon', 'aws', 'azure', 'google cloud',
|
|
'digitalocean', 'datacenter', 'hosting', 'hetzner']):
|
|
ip_type = "[DATACENTER/CLOUD]"
|
|
|
|
reason_text = ""
|
|
if reason == 'low_variety':
|
|
reason_text = f"Geringe URL-Vielfalt ({metric}%)"
|
|
elif reason == 'high_404':
|
|
reason_text = f"Hohe 404-Rate ({metric}%)"
|
|
|
|
# Berechne Request-Rate
|
|
max_rate, avg_rate, burst_count = self.calculate_request_rate(ip)
|
|
rate_info = f"Max: {max_rate}/min, Avg: {avg_rate:.1f}/min"
|
|
|
|
self.print_and_log(f" {ip:<15} {ip_type:<20} - {reason_text}")
|
|
self.print_and_log(f" {count} requests | {info}")
|
|
self.print_and_log(f" Request-Rate: {rate_info}")
|
|
|
|
# Zeige Top-URLs für diese IP
|
|
top_urls = self.get_top_urls_for_ip(ip, 3)
|
|
if top_urls:
|
|
self.print_and_log(" Top angefragte URLs:")
|
|
for url, url_count in top_urls:
|
|
url_display = url[:60] + "..." if len(url) > 60 else url
|
|
self.print_and_log(f" {url_count:6d}x {url_display}")
|
|
|
|
# Warnung bei sehr hohen Raten
|
|
if max_rate > self.extreme_rate_threshold:
|
|
self.print_and_log(f" ⚠️ WARNUNG: Sehr hohe Request-Rate! ({max_rate} Requests/Minute)")
|
|
elif max_rate > self.extreme_rate_threshold * 0.75:
|
|
self.print_and_log(f" ⚠️ Erhöhte Request-Rate: {max_rate} Requests/Minute")
|
|
|
|
self.print_and_log()
|
|
else:
|
|
self.print_and_log(" Keine verdächtigen IPs für Block-Liste gefunden")
|
|
|
|
self.print_and_log()
|
|
self.print_and_log("💡 HINWEIS: Überprüfe diese IPs manuell bevor du sie blockierst!")
|
|
self.print_and_log(" - Residential IPs (Telekom, Vodafone) könnten echte User sein")
|
|
self.print_and_log(" - Cloud/Datacenter IPs sind meist Bots")
|
|
|
|
|
|
def main():
|
|
"""Hauptfunktion mit interaktivem Modus"""
|
|
|
|
# Wenn keine Argumente angegeben, starte interaktiven Modus
|
|
if len(sys.argv) == 1:
|
|
# Sammle alle Interaktions-Logs
|
|
all_interaction_logs = []
|
|
|
|
all_interaction_logs.append("\n" + "="*67)
|
|
all_interaction_logs.append(" JTL-SHOP LOG-ANALYSE TOOL ".center(67))
|
|
all_interaction_logs.append("="*67)
|
|
all_interaction_logs.append("\nInteraktiver Modus - Keine Parameter angegeben")
|
|
all_interaction_logs.append("Starte Schritt-für-Schritt Konfiguration...")
|
|
|
|
print("\n" + "="*67)
|
|
print(" JTL-SHOP LOG-ANALYSE TOOL ".center(67))
|
|
print("="*67)
|
|
print("\nInteraktiver Modus - Keine Parameter angegeben")
|
|
print("Starte Schritt-für-Schritt Konfiguration...")
|
|
|
|
# 1. Domain auswählen
|
|
domains = discover_domains()
|
|
selected_domain, domain_log = select_domain_interactive(domains)
|
|
all_interaction_logs.extend(domain_log)
|
|
if not selected_domain:
|
|
sys.exit(0)
|
|
|
|
# 2. Zeitspanne wählen
|
|
hours, timespan_log = get_timespan_interactive()
|
|
all_interaction_logs.extend(timespan_log)
|
|
|
|
# 3. Top-N wählen
|
|
top_n, topn_log = get_top_n_interactive()
|
|
all_interaction_logs.extend(topn_log)
|
|
|
|
# 4. Extreme Rate Schwellwert wählen
|
|
extreme_threshold, threshold_log = get_extreme_rate_threshold_interactive()
|
|
all_interaction_logs.extend(threshold_log)
|
|
|
|
# Zusammenfassung
|
|
summary_lines = [
|
|
"\n" + "="*67,
|
|
"ANALYSE-KONFIGURATION",
|
|
"="*67,
|
|
f" Domain: {selected_domain}",
|
|
f" Zeitspanne: {hours} Stunden",
|
|
]
|
|
|
|
if top_n:
|
|
summary_lines.append(f" Ergebnisse: Top {top_n}")
|
|
else:
|
|
summary_lines.append(f" Ergebnisse: ALLE")
|
|
|
|
summary_lines.append(f" Extreme Rate: {extreme_threshold} Requests/Minute (~{extreme_threshold/60:.1f}/Sekunde)")
|
|
summary_lines.append("="*67)
|
|
|
|
for line in summary_lines:
|
|
print(line)
|
|
all_interaction_logs.append(line)
|
|
|
|
confirm = input("\n📌 Analyse starten? [Y/n]: ").strip().lower()
|
|
all_interaction_logs.append(f"\n📌 Analyse starten? [Y/n]: {confirm}")
|
|
|
|
if confirm and confirm not in ['y', 'yes', 'j', 'ja']:
|
|
msg = "Abbruch."
|
|
print(msg)
|
|
all_interaction_logs.append(msg)
|
|
sys.exit(0)
|
|
|
|
msg = "\n🚀 Starte Analyse...\n"
|
|
print(msg)
|
|
all_interaction_logs.append(msg)
|
|
|
|
# Starte Analyse mit gewählten Parametern und Interaction-Log
|
|
analyzer = LogAnalyzer(hours=hours, top_n=top_n, domain=selected_domain,
|
|
extreme_rate_threshold=extreme_threshold,
|
|
interactive_log=all_interaction_logs)
|
|
try:
|
|
analyzer.analyze()
|
|
except KeyboardInterrupt:
|
|
print("\n\nAnalyse abgebrochen durch Benutzer.")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
print(f"\nFehler während der Analyse: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
else:
|
|
# Klassischer Modus mit Kommandozeilen-Argumenten
|
|
parser = argparse.ArgumentParser(
|
|
description='Request-Analyse für JTL-Shop mit Bot-Erkennung',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Verwendung:
|
|
|
|
INTERAKTIVER MODUS (empfohlen):
|
|
%(prog)s # Startet interaktive Shop-Auswahl und Konfiguration
|
|
|
|
KOMMANDOZEILEN-MODUS:
|
|
%(prog)s <domain> <stunden> [top_n]
|
|
%(prog)s <stunden> [top_n] # Nutzt Standard-Domain
|
|
|
|
Beispiele:
|
|
%(prog)s # Interaktiver Modus
|
|
%(prog)s taschengelddieb.de 24h # Spezifische Domain, 24h, alle Ergebnisse
|
|
%(prog)s 24h # Standard-Domain, 24h, alle Ergebnisse
|
|
%(prog)s 12h 50 # Standard-Domain, 12h, Top 50
|
|
%(prog)s shop.de 72h 100 # Spezifische Domain, 72h, Top 100
|
|
"""
|
|
)
|
|
|
|
# Flexibles Parsing der Argumente
|
|
parser.add_argument('arg1', nargs='?', help='Domain oder Zeitspanne')
|
|
parser.add_argument('arg2', nargs='?', help='Zeitspanne oder Top-N')
|
|
parser.add_argument('arg3', nargs='?', help='Top-N oder Extreme-Rate')
|
|
parser.add_argument('arg4', nargs='?', help='Extreme-Rate (optional)')
|
|
parser.add_argument('--extreme-rate', type=int, default=60,
|
|
help='Extreme Rate Schwellwert in Requests/Minute (Standard: 60)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Intelligente Argument-Interpretation
|
|
domain = None
|
|
timespan = None
|
|
top_n = None
|
|
extreme_rate = args.extreme_rate # Nutze den --extreme-rate flag als Default
|
|
|
|
# Prüfe ob arg1 eine Domain ist (enthält Punkt) oder Zeitspanne
|
|
if args.arg1:
|
|
if '.' in args.arg1:
|
|
# Es ist eine Domain
|
|
domain = args.arg1
|
|
timespan = args.arg2
|
|
if args.arg3:
|
|
try:
|
|
top_n = int(args.arg3)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
elif args.arg2 and not any(c in args.arg2 for c in ['h', 'H']):
|
|
# arg2 könnte top_n sein
|
|
try:
|
|
top_n = int(args.arg2)
|
|
timespan = None
|
|
except (ValueError, TypeError):
|
|
pass
|
|
else:
|
|
# Es ist eine Zeitspanne
|
|
timespan = args.arg1
|
|
if args.arg2:
|
|
try:
|
|
top_n = int(args.arg2)
|
|
except (ValueError, TypeError):
|
|
# Vielleicht ist es eine Domain
|
|
if '.' in args.arg2:
|
|
domain = args.arg2
|
|
|
|
# Falls keine Domain angegeben, versuche Standard oder zeige Auswahl
|
|
if not domain:
|
|
domains = discover_domains()
|
|
if len(domains) == 1:
|
|
domain = domains[0]
|
|
print(f"Verwende einzige verfügbare Domain: {domain}")
|
|
elif 'taschengelddieb.de' in domains:
|
|
domain = 'taschengelddieb.de'
|
|
print(f"Verwende Standard-Domain: {domain}")
|
|
else:
|
|
# Interaktive Auswahl
|
|
domain, _ = select_domain_interactive(domains)
|
|
if not domain:
|
|
sys.exit(0)
|
|
|
|
# Parse Zeitspanne
|
|
if not timespan:
|
|
hours = 48
|
|
print(f"Keine Zeitspanne angegeben, nutze Standard: {hours}h")
|
|
else:
|
|
timespan = timespan.rstrip('hH')
|
|
try:
|
|
hours = int(timespan)
|
|
except ValueError:
|
|
print(f"Fehler: Ungültige Zeitspanne '{timespan}'")
|
|
print("Nutze Format: <Stunden>h oder <Stunden>")
|
|
sys.exit(1)
|
|
|
|
# Info ausgeben
|
|
if top_n:
|
|
print(f"Starte Analyse für die letzten {hours} Stunden (Top {top_n})...")
|
|
else:
|
|
print(f"Starte Analyse für die letzten {hours} Stunden (ALLE Ergebnisse)...")
|
|
|
|
print(f"Domain: {domain}")
|
|
print(f"Extreme Rate Schwellwert: {extreme_rate} Requests/Minute")
|
|
print(f"CPU-Cores: {multiprocessing.cpu_count()}")
|
|
print()
|
|
|
|
# Starte Analyse
|
|
analyzer = LogAnalyzer(hours=hours, top_n=top_n, domain=domain,
|
|
extreme_rate_threshold=extreme_rate)
|
|
|
|
try:
|
|
analyzer.analyze()
|
|
except KeyboardInterrupt:
|
|
print("\n\nAnalyse abgebrochen durch Benutzer.")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
print(f"\nFehler während der Analyse: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |