check-if-ip-banned.py hinzugefügt

This commit is contained in:
2025-11-27 15:55:10 +01:00
parent c37a31af03
commit d65a4145d1

987
check-if-ip-banned.py Normal file
View File

@@ -0,0 +1,987 @@
#!/usr/bin/env python3
"""
CrowdSec IP Analyzer - Comprehensive IP blocking analysis
Searches all logs, configs, services AND vhost logs (including .gz archives)
Optional dependency for better timestamp parsing:
pip install python-dateutil --break-system-packages
(Script works without it, but timestamps will be less formatted)
"""
import subprocess
import json
import os
import sys
import re
from datetime import datetime
from pathlib import Path
import argparse
from collections import defaultdict
from multiprocessing import Pool, cpu_count
from functools import partial
# Try to import dateutil for better timestamp parsing, but continue without it
try:
from dateutil import parser as date_parser
HAS_DATEUTIL = True
except ImportError:
HAS_DATEUTIL = False
class Colors:
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
MAGENTA = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
BOLD = '\033[1m'
END = '\033[0m'
def search_single_file(args):
"""Worker function to search a single file for an IP address"""
filepath, ip_address, is_vhost = args
try:
# Determine if it's a compressed file
if filepath.endswith('.gz'):
cmd = f'zgrep -i "{ip_address}" "{filepath}" 2>/dev/null | head -n 100'
else:
cmd = f'grep -i "{ip_address}" "{filepath}" 2>/dev/null | head -n 100'
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
if result.returncode == 0 and result.stdout:
lines = result.stdout.strip().split('\n')
return (filepath, lines)
else:
return None
except Exception:
return None
class IPAnalyzer:
def __init__(self, ip_address):
self.ip = ip_address
self.results = {
'crowdsec': {},
'bouncers': {},
'firewall': {},
'logs': {},
'services': {},
'vhosts': {}
}
# Calculate worker count: total CPUs - 6, minimum 1
total_cpus = cpu_count()
self.worker_count = max(1, total_cpus - 6)
print(f"{Colors.CYAN}Verwende {self.worker_count} CPU-Cores für parallele Verarbeitung (von {total_cpus} verfügbar){Colors.END}")
def print_header(self, text):
print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*80}{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}{text}{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}{'='*80}{Colors.END}\n")
def print_section(self, text):
print(f"\n{Colors.BOLD}{Colors.YELLOW}>>> {text}{Colors.END}")
def print_finding(self, severity, text):
colors = {
'critical': Colors.RED,
'warning': Colors.YELLOW,
'info': Colors.GREEN,
'detail': Colors.WHITE
}
color = colors.get(severity, Colors.WHITE)
symbols = {
'critical': '',
'warning': '⚠️ ',
'info': '',
'detail': ''
}
symbol = symbols.get(severity, '')
print(f"{color}{symbol} {text}{Colors.END}")
def run_command(self, cmd, shell=False):
"""Run command and return output"""
try:
if shell:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
else:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return result.stdout, result.stderr, result.returncode
except subprocess.TimeoutExpired:
return "", "Command timeout", 1
except Exception as e:
return "", str(e), 1
def check_crowdsec_decisions(self):
"""Check CrowdSec decisions for this IP"""
self.print_section("CrowdSec Decisions")
# Get human-readable output
stdout, stderr, code = self.run_command(['cscli', 'decisions', 'list', '--ip', self.ip])
if code == 0 and stdout:
lines = stdout.strip().split('\n')
# Filter for actual decision lines (not headers/separators)
decision_lines = [l for l in lines if self.ip in l and not l.startswith('') and not l.startswith('│ ID')]
if decision_lines:
self.print_finding('critical', f"Gefunden: {len(decision_lines)} Decision(s)")
for line in decision_lines:
self.print_finding('detail', line)
self.results['crowdsec']['decisions'] = decision_lines
# Get detailed JSON output with timestamps
stdout_json, _, code_json = self.run_command(['cscli', 'decisions', 'list', '--ip', self.ip, '-o', 'json'])
if code_json == 0 and stdout_json:
try:
decisions = json.loads(stdout_json)
if decisions:
print(f"\n{Colors.BOLD}Zeitstempel der Decisions:{Colors.END}")
for decision in decisions:
decision_type = decision.get('type', 'unknown')
scenario = decision.get('scenario', 'unknown')
created_at = decision.get('created_at', '')
duration = decision.get('duration', '')
# Parse and format timestamp
if created_at:
if HAS_DATEUTIL:
try:
dt = date_parser.parse(created_at)
created_str = dt.strftime('%Y-%m-%d %H:%M:%S')
except:
created_str = created_at
else:
# Fallback: just show the raw timestamp
created_str = created_at[:19] if len(created_at) >= 19 else created_at
else:
created_str = 'N/A'
color = Colors.RED if decision_type == 'ban' else Colors.YELLOW if decision_type == 'whitelist' else Colors.WHITE
self.print_finding('detail', f"{color}[{decision_type}] {scenario} - Erstellt: {created_str} - Dauer: {duration}{Colors.END}")
self.results['crowdsec']['decision_details'] = decisions
except json.JSONDecodeError:
pass
else:
self.print_finding('info', "Keine aktiven Decisions gefunden")
else:
self.print_finding('warning', f"Fehler beim Abrufen der Decisions: {stderr}")
def check_crowdsec_alerts(self):
"""Check CrowdSec alerts for this IP"""
self.print_section("CrowdSec Alerts (mit Zeitstempeln)")
stdout, stderr, code = self.run_command(['cscli', 'alerts', 'list', '--ip', self.ip, '-l', '20'])
if code == 0 and stdout:
lines = stdout.strip().split('\n')
alert_lines = [l for l in lines if self.ip in l or 'scenario' in l.lower()]
if len(alert_lines) > 1: # More than just header
self.print_finding('warning', f"Gefunden: {len(alert_lines)-1} Alert(s)")
for line in alert_lines[:10]: # Show first 10
self.print_finding('detail', line)
self.results['crowdsec']['alerts'] = alert_lines
# Get JSON output for better timestamp parsing
stdout_json, _, code_json = self.run_command(['cscli', 'alerts', 'list', '--ip', self.ip, '-l', '20', '-o', 'json'])
if code_json == 0 and stdout_json:
try:
alerts = json.loads(stdout_json)
if alerts:
print(f"\n{Colors.BOLD}Chronologische Alert-Übersicht:{Colors.END}")
# Sort by created_at
sorted_alerts = sorted(alerts, key=lambda x: x.get('created_at', ''), reverse=True)
for alert in sorted_alerts[:10]:
scenario = alert.get('scenario', 'unknown')
created_at = alert.get('created_at', '')
decisions = alert.get('decisions', [])
decision_type = decisions[0].get('type', 'unknown') if decisions else 'no decision'
# Parse timestamp
if created_at:
if HAS_DATEUTIL:
try:
dt = date_parser.parse(created_at)
created_str = dt.strftime('%Y-%m-%d %H:%M:%S')
except:
created_str = created_at
else:
created_str = created_at[:19] if len(created_at) >= 19 else created_at
else:
created_str = 'N/A'
color = Colors.RED if decision_type == 'ban' else Colors.YELLOW if decision_type in ['whitelist', '(simul)ban'] else Colors.WHITE
self.print_finding('detail', f"{color}{created_str} - [{decision_type}] {scenario}{Colors.END}")
except json.JSONDecodeError:
pass
else:
self.print_finding('info', "Keine Alerts gefunden")
else:
self.print_finding('warning', f"Fehler beim Abrufen der Alerts: {stderr}")
def check_bouncers(self):
"""Check all registered bouncers"""
self.print_section("CrowdSec Bouncers")
stdout, stderr, code = self.run_command(['cscli', 'bouncers', 'list'])
if code == 0 and stdout:
print(stdout)
self.results['bouncers']['registered'] = stdout
# Try to get bouncer metrics
stdout2, _, code2 = self.run_command(['cscli', 'metrics', 'show', 'bouncers'])
if code2 == 0 and stdout2:
print(stdout2)
self.results['bouncers']['metrics'] = stdout2
else:
self.print_finding('warning', f"Fehler beim Abrufen der Bouncers: {stderr}")
def check_firewall_rules(self):
"""Check firewall rules (iptables, nftables)"""
self.print_section("Firewall Regeln")
# Check iptables
for table in ['filter', 'nat', 'mangle']:
stdout, _, code = self.run_command(['iptables', '-t', table, '-L', '-n', '-v'])
if code == 0 and self.ip in stdout:
self.print_finding('critical', f"IP gefunden in iptables table '{table}':")
for line in stdout.split('\n'):
if self.ip in line:
self.print_finding('detail', line)
self.results['firewall'][f'iptables_{table}'] = [l for l in stdout.split('\n') if self.ip in l]
# Check ip6tables
for table in ['filter', 'nat', 'mangle']:
stdout, _, code = self.run_command(['ip6tables', '-t', table, '-L', '-n', '-v'])
if code == 0 and self.ip in stdout:
self.print_finding('critical', f"IP gefunden in ip6tables table '{table}':")
for line in stdout.split('\n'):
if self.ip in line:
self.print_finding('detail', line)
# Check nftables
stdout, _, code = self.run_command(['nft', 'list', 'ruleset'])
if code == 0 and self.ip in stdout:
self.print_finding('critical', "IP gefunden in nftables:")
for line in stdout.split('\n'):
if self.ip in line:
self.print_finding('detail', line)
self.results['firewall']['nftables'] = [l for l in stdout.split('\n') if self.ip in l]
# Check ipset with timeout information
stdout, _, code = self.run_command(['ipset', 'list'])
if code == 0 and self.ip in stdout:
self.print_finding('critical', "IP gefunden in ipset:")
# Parse ipset output to find the IP and its timeout
current_set = None
for line in stdout.split('\n'):
if line.startswith('Name:'):
current_set = line.split('Name:')[1].strip()
elif self.ip in line:
self.print_finding('detail', f"Set: {current_set}")
self.print_finding('detail', line)
# Extract timeout if present
if 'timeout' in line:
match = re.search(r'timeout (\d+)', line)
if match:
timeout_seconds = int(match.group(1))
timeout_minutes = timeout_seconds // 60
timeout_hours = timeout_minutes // 60
remaining_minutes = timeout_minutes % 60
# Get decision creation time to calculate when it was added
if self.results['crowdsec'].get('decision_details'):
for decision in self.results['crowdsec']['decision_details']:
created_at = decision.get('created_at', '')
duration = decision.get('duration', '')
if created_at:
if HAS_DATEUTIL:
try:
import datetime as dt
created_dt = date_parser.parse(created_at)
created_str = created_dt.strftime('%Y-%m-%d %H:%M:%S')
self.print_finding('warning',
f"Hinzugefügt um: {created_str} (läuft ab in {timeout_hours}h {remaining_minutes}m)")
except:
self.print_finding('warning',
f"Verbleibende Zeit: {timeout_hours}h {remaining_minutes}m ({timeout_seconds}s)")
else:
created_str = created_at[:19] if len(created_at) >= 19 else created_at
self.print_finding('warning',
f"Hinzugefügt um: {created_str} (läuft ab in {timeout_hours}h {remaining_minutes}m)")
else:
self.print_finding('warning',
f"Verbleibende Zeit: {timeout_hours}h {remaining_minutes}m ({timeout_seconds}s)")
else:
self.print_finding('warning',
f"Verbleibende Zeit: {timeout_hours}h {remaining_minutes}m ({timeout_seconds}s)")
self.results['firewall']['ipset'] = [l for l in stdout.split('\n') if self.ip in l]
if not self.results['firewall']:
self.print_finding('info', "IP nicht in Firewall-Regeln gefunden")
def search_systemd_journal(self):
"""Search systemd journal for IP"""
self.print_section("Systemd Journal")
# Search all journal entries
stdout, _, code = self.run_command(
f'journalctl --no-pager -n 1000 | grep -i "{self.ip}"',
shell=True
)
if code == 0 and stdout:
lines = stdout.strip().split('\n')
self.print_finding('warning', f"IP gefunden in {len(lines)} Journal-Einträgen")
# Show first 20 lines
for line in lines[:20]:
self.print_finding('detail', line[:150])
if len(lines) > 20:
self.print_finding('info', f"... und {len(lines)-20} weitere Einträge")
self.results['logs']['journalctl'] = lines
else:
self.print_finding('info', "IP nicht im Journal gefunden")
def search_log_files(self):
"""Search all log files in /var/log using parallel processing"""
self.print_section("System Log-Dateien durchsuchen (parallel)")
log_dirs = ['/var/log']
files_to_search = []
for log_dir in log_dirs:
if not os.path.exists(log_dir):
continue
# Find all log files
for root, dirs, files in os.walk(log_dir):
# Skip some directories
dirs[:] = [d for d in dirs if d not in ['journal', 'private']]
for file in files:
filepath = os.path.join(root, file)
# Skip binary files and very large files
try:
if os.path.getsize(filepath) > 100 * 1024 * 1024: # Skip files > 100MB
continue
except:
continue
files_to_search.append((filepath, self.ip, False))
if not files_to_search:
self.print_finding('info', "Keine Log-Dateien zum Durchsuchen gefunden")
return
self.print_finding('info', f"Durchsuche {len(files_to_search)} System-Log-Dateien parallel...")
# Use multiprocessing pool
found_in_files = {}
with Pool(processes=self.worker_count) as pool:
results = pool.map(search_single_file, files_to_search)
# Process results
for result in results:
if result:
filepath, lines = result
found_in_files[filepath] = lines
self.print_finding('warning', f"Gefunden in: {filepath} ({len(lines)} Zeilen)")
# Show first 3 lines
for line in lines[:3]:
self.print_finding('detail', line[:150])
if found_in_files:
self.results['logs']['files'] = found_in_files
else:
self.print_finding('info', "IP nicht in Log-Dateien gefunden")
def check_specific_services(self):
"""Check specific service logs"""
self.print_section("Service-spezifische Prüfungen")
services_to_check = [
('crowdsec', 'CrowdSec Service'),
('crowdsec-firewall-bouncer', 'CrowdSec Firewall Bouncer'),
('nginx', 'Nginx'),
('apache2', 'Apache'),
('dovecot', 'Dovecot'),
('postfix', 'Postfix'),
('fail2ban', 'Fail2ban')
]
for service, name in services_to_check:
stdout, _, code = self.run_command(
f'journalctl -u {service} --no-pager -n 500 | grep -i "{self.ip}"',
shell=True
)
if code == 0 and stdout:
lines = stdout.strip().split('\n')
self.print_finding('warning', f"{name}: {len(lines)} Einträge gefunden")
for line in lines[:5]:
self.print_finding('detail', line[:150])
self.results['services'][service] = lines
def check_crowdsec_scenarios(self):
"""Check which scenarios triggered for this IP"""
self.print_section("Ausgelöste Szenarien")
stdout, _, code = self.run_command(
f'cscli alerts list --ip {self.ip} -o json',
shell=False
)
scenarios = {}
if code == 0 and stdout:
try:
alerts = json.loads(stdout)
if alerts:
for alert in alerts:
scenario = alert.get('scenario', 'unknown')
count = alert.get('events_count', 1)
if scenario in scenarios:
scenarios[scenario] += count
else:
scenarios[scenario] = count
self.results['crowdsec']['alert_details'] = alerts
except json.JSONDecodeError:
pass
# Also check decisions for scenarios if we don't have alerts
if not scenarios and self.results['crowdsec'].get('decision_details'):
for decision in self.results['crowdsec']['decision_details']:
scenario = decision.get('scenario', 'unknown')
if scenario and scenario not in scenarios:
scenarios[scenario] = 1
if scenarios:
self.print_finding('warning', f"{len(scenarios)} verschiedene Szenarien:")
for scenario, count in sorted(scenarios.items(), key=lambda x: x[1], reverse=True):
self.print_finding('detail', f"{scenario}: {count} Events")
self.results['crowdsec']['scenarios'] = scenarios
else:
self.print_finding('info', "Keine Szenarien gefunden")
def show_triggering_log_lines(self):
"""Show the actual log lines that triggered the bans"""
if not self.results['crowdsec'].get('scenarios'):
return
self.print_section("Auslösende Log-Zeilen (Was führte zum Ban?)")
# Map scenarios to log file patterns and search terms
scenario_patterns = {
'crowdsecurity/postfix-relay-denied': {
'log_files': ['/var/log/maillog', '/var/log/mail.log'],
'patterns': ['Relay access denied', 'relay denied'],
'description': 'Postfix Relay-Versuche'
},
'crowdsecurity/postfix-non-smtp-command': {
'log_files': ['/var/log/maillog', '/var/log/mail.log'],
'patterns': ['non-SMTP command', 'Non-SMTP command'],
'description': 'Ungültige SMTP-Befehle'
},
'crowdsecurity/dovecot-spam': {
'log_files': ['/var/log/maillog', '/var/log/mail.log'],
'patterns': ['auth failed', 'authentication failed', 'Login aborted'],
'description': 'Dovecot Auth-Failures'
},
'crowdsecurity/http-probing': {
'log_files': ['/var/log/nginx/access.log', '/var/log/apache2/access.log'],
'patterns': ['404', '403'],
'description': 'HTTP Probing/Scanning'
},
'crowdsecurity/http-backdoors-attempts': {
'log_files': ['/var/log/nginx/access.log', '/var/log/apache2/access.log'],
'patterns': ['php', 'admin', 'wp-'],
'description': 'Backdoor-Versuche'
}
}
for scenario in self.results['crowdsec']['scenarios'].keys():
if scenario.startswith('Ticket'): # Skip whitelist "scenarios"
continue
self.print_finding('warning', f"\n{Colors.BOLD}Szenario: {scenario}{Colors.END}")
pattern_info = scenario_patterns.get(scenario)
if pattern_info:
self.print_finding('info', f"Suche nach: {pattern_info['description']}")
found_lines = []
for log_file in pattern_info['log_files']:
# Search current log file
if os.path.exists(log_file):
for pattern in pattern_info['patterns']:
cmd = f'grep -i "{self.ip}" "{log_file}" 2>/dev/null | grep -i "{pattern}" | tail -n 20'
stdout, _, code = self.run_command(cmd, shell=True)
if code == 0 and stdout:
lines = stdout.strip().split('\n')
found_lines.extend(lines)
# Also search compressed logs (last 3 rotations)
for i in range(1, 4):
compressed_log = f"{log_file}.{i}.gz"
if os.path.exists(compressed_log):
for pattern in pattern_info['patterns']:
cmd = f'zgrep -i "{self.ip}" "{compressed_log}" 2>/dev/null | grep -i "{pattern}" | tail -n 20'
stdout, _, code = self.run_command(cmd, shell=True)
if code == 0 and stdout:
lines = stdout.strip().split('\n')
found_lines.extend(lines)
if found_lines:
# Remove duplicates
found_lines = list(dict.fromkeys(found_lines))
self.print_finding('critical', f"Gefunden: {len(found_lines)} relevante Log-Zeile(n)")
for i, line in enumerate(found_lines[:15], 1): # Show max 15
self.print_finding('detail', f"{i}. {line[:200]}")
if len(found_lines) > 15:
self.print_finding('info', f"... und {len(found_lines)-15} weitere Zeilen")
else:
self.print_finding('info', "Keine spezifischen Log-Zeilen gefunden (möglicherweise bereits rotiert)")
else:
# Generic search for unknown scenarios
self.print_finding('info', "Kein spezifisches Such-Pattern für dieses Szenario definiert")
self.print_finding('info', f"Durchsuche maillog nach IP {self.ip}...")
# Try current and compressed logs
found = False
for log_path in ['/var/log/maillog', '/var/log/mail.log']:
if os.path.exists(log_path):
cmd = f'grep -i "{self.ip}" "{log_path}" 2>/dev/null | tail -n 10'
stdout, _, code = self.run_command(cmd, shell=True)
if code == 0 and stdout:
lines = stdout.strip().split('\n')
self.print_finding('warning', f"Letzte {len(lines)} Zeilen mit dieser IP:")
for line in lines:
self.print_finding('detail', line[:200])
found = True
break
if not found:
self.print_finding('info', "Keine Log-Zeilen gefunden")
def search_vhost_logs(self):
"""Search all vhost logs including compressed archives using parallel processing"""
self.print_section("VHost Logs durchsuchen (parallel, inkl. .gz Archive)")
vhosts_path = '/var/www/vhosts'
if not os.path.exists(vhosts_path):
self.print_finding('info', f"VHosts Pfad nicht gefunden: {vhosts_path}")
return
# Get all vhosts
try:
vhosts = [d for d in os.listdir(vhosts_path)
if os.path.isdir(os.path.join(vhosts_path, d))
and d not in ['chroot', 'default', 'system']]
except Exception as e:
self.print_finding('warning', f"Fehler beim Lesen von {vhosts_path}: {e}")
return
self.print_finding('info', f"Durchsuche {len(vhosts)} VHosts parallel...")
# Collect all log files to search
files_to_search = []
vhost_file_mapping = {} # Map filepath to (vhost, log_file)
for vhost in sorted(vhosts):
logs_dir = os.path.join(vhosts_path, vhost, 'logs')
if not os.path.exists(logs_dir):
continue
# Get all log files in this vhost
try:
log_files = os.listdir(logs_dir)
except Exception:
continue
# Add each log file to search list
for log_file in log_files:
log_path = os.path.join(logs_dir, log_file)
# Skip if not a file
if not os.path.isfile(log_path):
continue
files_to_search.append((log_path, self.ip, True))
vhost_file_mapping[log_path] = (vhost, log_file)
if not files_to_search:
self.print_finding('info', "Keine VHost-Log-Dateien zum Durchsuchen gefunden")
return
self.print_finding('info', f"Durchsuche {len(files_to_search)} VHost-Log-Dateien...")
# Use multiprocessing pool
vhost_stats = defaultdict(lambda: {'total': 0, 'by_log_type': defaultdict(int), 'sample_lines': []})
with Pool(processes=self.worker_count) as pool:
results = pool.map(search_single_file, files_to_search)
# Process results
for result in results:
if result:
filepath, lines = result
count = len(lines)
# Get vhost and log_file info
if filepath in vhost_file_mapping:
vhost, log_file = vhost_file_mapping[filepath]
# Determine log type
log_type = 'other'
if 'access' in log_file:
log_type = 'access'
elif 'error' in log_file:
log_type = 'error'
elif 'php-fpm' in log_file:
log_type = 'php-fpm'
elif 'proxy' in log_file:
log_type = 'proxy'
elif 'xferlog' in log_file:
log_type = 'xferlog'
vhost_stats[vhost]['total'] += count
vhost_stats[vhost]['by_log_type'][log_type] += count
# Save first 3 lines as sample
if len(vhost_stats[vhost]['sample_lines']) < 3:
for line in lines[:3]:
if len(vhost_stats[vhost]['sample_lines']) < 3:
vhost_stats[vhost]['sample_lines'].append((log_file, line))
# Display results
if vhost_stats:
self.results['vhosts'] = dict(vhost_stats)
# Sort by total count
sorted_vhosts = sorted(vhost_stats.items(), key=lambda x: x[1]['total'], reverse=True)
self.print_finding('warning', f"IP in {len(sorted_vhosts)} VHost(s) gefunden:")
print()
for vhost, stats in sorted_vhosts:
self.print_finding('warning', f"{Colors.BOLD}{vhost}{Colors.END}: {stats['total']} Einträge")
# Show breakdown by log type
for log_type, count in sorted(stats['by_log_type'].items(), key=lambda x: x[1], reverse=True):
self.print_finding('detail', f" {log_type}: {count}")
# Show sample lines
if stats['sample_lines']:
self.print_finding('detail', f" Beispiel-Zeilen:")
for log_file, line in stats['sample_lines']:
short_line = line[:120] + '...' if len(line) > 120 else line
self.print_finding('detail', f" [{log_file}] {short_line}")
print()
else:
self.print_finding('info', "IP nicht in VHost-Logs gefunden")
def print_summary(self):
"""Print summary of findings"""
self.print_header("ZUSAMMENFASSUNG")
# Critical findings
critical_findings = []
if self.results['crowdsec'].get('decisions'):
critical_findings.append("CrowdSec hat aktive Decisions für diese IP")
if self.results['firewall']:
critical_findings.append("IP ist in Firewall-Regeln vorhanden")
if self.results['bouncers']:
critical_findings.append("IP wird von Bouncers verarbeitet")
if critical_findings:
print(f"{Colors.RED}{Colors.BOLD}BLOCKIERUNGEN GEFUNDEN:{Colors.END}")
for finding in critical_findings:
self.print_finding('critical', finding)
# Show when IP was first blocked
if self.results['crowdsec'].get('decision_details'):
print(f"\n{Colors.YELLOW}{Colors.BOLD}ZEITLICHE ÜBERSICHT:{Colors.END}")
# Find earliest ban decision
ban_decisions = [d for d in self.results['crowdsec']['decision_details']
if d.get('type') == 'ban']
if ban_decisions:
# Sort by created_at
sorted_bans = sorted(ban_decisions, key=lambda x: x.get('created_at', ''))
earliest_ban = sorted_bans[0]
created_at = earliest_ban.get('created_at', '')
scenario = earliest_ban.get('scenario', 'unknown')
if created_at:
if HAS_DATEUTIL:
try:
import datetime as dt
created_dt = date_parser.parse(created_at)
now = dt.datetime.now(created_dt.tzinfo)
time_diff = now - created_dt
hours = time_diff.seconds // 3600
minutes = (time_diff.seconds % 3600) // 60
created_str = created_dt.strftime('%Y-%m-%d %H:%M:%S')
self.print_finding('warning', f"Erste Sperrung: {created_str} (vor {hours}h {minutes}m)")
self.print_finding('warning', f"Grund: {scenario}")
except:
pass
else:
created_str = created_at[:19] if len(created_at) >= 19 else created_at
self.print_finding('warning', f"Erste Sperrung: {created_str}")
self.print_finding('warning', f"Grund: {scenario}")
else:
print(f"{Colors.GREEN}{Colors.BOLD}KEINE AKTIVEN BLOCKIERUNGEN GEFUNDEN{Colors.END}")
# Scenarios summary
if self.results['crowdsec'].get('scenarios'):
print(f"\n{Colors.YELLOW}{Colors.BOLD}WARUM WURDE GEBLOCKT:{Colors.END}")
for scenario, count in list(self.results['crowdsec']['scenarios'].items())[:5]:
self.print_finding('warning', f"{scenario} ({count}x)")
# Log occurrences
total_log_entries = 0
if self.results['logs'].get('journalctl'):
total_log_entries += len(self.results['logs']['journalctl'])
if self.results['logs'].get('files'):
for lines in self.results['logs']['files'].values():
total_log_entries += len(lines)
# VHost statistics
if self.results.get('vhosts'):
total_vhost_entries = sum(stats['total'] for stats in self.results['vhosts'].values())
most_accessed_vhost = max(self.results['vhosts'].items(), key=lambda x: x[1]['total'])
print(f"\n{Colors.CYAN}{Colors.BOLD}VHOST STATISTIK:{Colors.END}")
self.print_finding('info', f"Insgesamt {total_vhost_entries} Einträge in {len(self.results['vhosts'])} VHost(s)")
self.print_finding('warning', f"Am häufigsten: {most_accessed_vhost[0]} ({most_accessed_vhost[1]['total']} Einträge)")
# Show top 5 vhosts
sorted_vhosts = sorted(self.results['vhosts'].items(), key=lambda x: x[1]['total'], reverse=True)
print(f"\n{Colors.BOLD}Top 5 VHosts:{Colors.END}")
for i, (vhost, stats) in enumerate(sorted_vhosts[:5], 1):
log_types = ', '.join([f"{lt}:{c}" for lt, c in sorted(stats['by_log_type'].items(), key=lambda x: x[1], reverse=True)][:3])
self.print_finding('detail', f"{i}. {vhost}: {stats['total']} ({log_types})")
if total_log_entries > 0:
print(f"\n{Colors.CYAN}System-Logs: {total_log_entries} Einträge{Colors.END}")
# Recommendations
print(f"\n{Colors.BOLD}{Colors.GREEN}EMPFEHLUNGEN:{Colors.END}")
if self.results['crowdsec'].get('decisions'):
self.print_finding('info', "1. Lösche alle CrowdSec Decisions: cscli decisions delete --ip " + self.ip)
if self.results['firewall']:
self.print_finding('info', "2. Prüfe Firewall-Regeln manuell und entferne IP falls nötig")
if not critical_findings:
self.print_finding('info', "Die IP ist nicht blockiert. Falls der Kunde Probleme hat:")
self.print_finding('info', " - Überprüfe E-Mail Client Konfiguration (Passwörter, Ports)")
self.print_finding('info', " - Prüfe auf TLS/SSL Version Probleme")
self.print_finding('info', " - Kontrolliere Rate-Limits in Dovecot/Postfix")
self.print_finding('info', "Für dauerhafte Whitelist: IP in /etc/crowdsec/parsers/s02-enrich/jtl_whitelist.yaml eintragen")
self.print_finding('info', "Nach Änderungen CrowdSec neu laden: systemctl reload crowdsec")
def prompt_unblock(self):
"""Prompt user if they want to unblock/whitelist the IP temporarily"""
# Check if we should prompt
has_active_blocks = bool(self.results['crowdsec'].get('decisions') or self.results['firewall'])
has_past_blocks = bool(self.results['crowdsec'].get('alerts') or
any('crowdsecurity/' in str(logs) for logs in self.results.get('logs', {}).values()))
if not has_active_blocks and not has_past_blocks:
# No blocks found at all, no need to ask
return
print(f"\n{Colors.BOLD}{Colors.YELLOW}{'='*80}{Colors.END}")
if has_active_blocks:
print(f"{Colors.BOLD}{Colors.RED}⚠️ IP ist AKTUELL blockiert!{Colors.END}")
print(f"{Colors.BOLD}Möchtest du diese IP für 72 Stunden entsperren?{Colors.END}")
else:
print(f"{Colors.BOLD}{Colors.YELLOW} IP war in der Vergangenheit blockiert (aktuell frei){Colors.END}")
print(f"{Colors.BOLD}Möchtest du eine 72h Whitelist erstellen, um erneute Bans zu verhindern?{Colors.END}")
print(f"{Colors.WHITE}Dies wird:{Colors.END}")
if has_active_blocks:
print(f" 1. Alle aktuellen CrowdSec Decisions löschen")
print(f" 2. Eine temporäre Whitelist-Decision für 72h erstellen")
print(f" 3. Die IP aus der Firewall entfernen (falls vorhanden)")
else:
print(f" 1. Eine temporäre Whitelist-Decision für 72h erstellen")
print(f" 2. Verhindert, dass die IP in den nächsten 72h erneut gebannt wird")
print(f"{Colors.BOLD}{Colors.YELLOW}{'='*80}{Colors.END}")
response = input(f"{Colors.GREEN}Whitelist erstellen? [j/n]: {Colors.END}").strip().lower()
if response in ['j', 'ja', 'y', 'yes']:
print(f"\n{Colors.CYAN}Erstelle Whitelist für IP {self.ip}...{Colors.END}\n")
step = 1
total_steps = 3 if has_active_blocks else 1
# Step 1: Delete all decisions if there are active blocks
if has_active_blocks:
self.print_finding('info', f"Schritt {step}/{total_steps}: Lösche alle Decisions...")
stdout, stderr, code = self.run_command(['cscli', 'decisions', 'delete', '--ip', self.ip])
if code == 0:
# Parse how many were deleted
if "decision(s) deleted" in stdout:
self.print_finding('info', stdout.strip())
else:
self.print_finding('info', "Decisions gelöscht")
else:
self.print_finding('warning', f"Fehler beim Löschen: {stderr}")
step += 1
# Step 2: Add whitelist decision for 72h
self.print_finding('info', f"Schritt {step}/{total_steps}: Erstelle 72h Whitelist-Decision...")
stdout, stderr, code = self.run_command([
'cscli', 'decisions', 'add',
'--ip', self.ip,
'--type', 'whitelist',
'--duration', '72h',
'--reason', 'Temporäre Whitelist via Analyzer-Script'
])
if code == 0:
self.print_finding('info', "Whitelist-Decision erfolgreich erstellt")
else:
self.print_finding('warning', f"Fehler beim Erstellen der Whitelist: {stderr}")
step += 1
# Step 3: Try to remove from ipset if present
if has_active_blocks and self.results['firewall'].get('ipset'):
self.print_finding('info', f"Schritt {step}/{total_steps}: Entferne IP aus ipset...")
# Try common crowdsec ipset names
removed = False
for i in range(5):
set_name = f'crowdsec-blacklists-{i}'
stdout, stderr, code = self.run_command(['ipset', 'test', set_name, self.ip])
if code == 0: # IP is in this set
stdout2, stderr2, code2 = self.run_command(['ipset', 'del', set_name, self.ip])
if code2 == 0:
self.print_finding('info', f"IP aus {set_name} entfernt")
removed = True
else:
self.print_finding('warning', f"Fehler beim Entfernen aus {set_name}: {stderr2}")
if not removed:
self.print_finding('info', "IP nicht in ipset gefunden oder bereits abgelaufen")
# Final summary
print(f"\n{Colors.GREEN}{Colors.BOLD}✓ Fertig!{Colors.END}")
if has_active_blocks:
print(f"{Colors.GREEN}Die IP {self.ip} wurde entsperrt und ist jetzt für 72 Stunden gewhitelistet.{Colors.END}")
else:
print(f"{Colors.GREEN}Die IP {self.ip} ist jetzt für 72 Stunden gewhitelistet.{Colors.END}")
print(f"{Colors.YELLOW}Hinweis: Die Whitelist läuft automatisch nach 72h ab.{Colors.END}")
# Verify
print(f"\n{Colors.CYAN}Überprüfe neuen Status...{Colors.END}")
stdout, stderr, code = self.run_command(['cscli', 'decisions', 'list', '--ip', self.ip])
if code == 0 and stdout:
lines = [l for l in stdout.split('\n') if self.ip in l and not l.startswith('') and not l.startswith('│ ID')]
if lines:
for line in lines:
if 'whitelist' in line:
self.print_finding('info', line)
else:
print(f"\n{Colors.YELLOW}Abgebrochen. Keine Änderungen vorgenommen.{Colors.END}")
def analyze(self):
"""Run full analysis"""
self.print_header(f"CrowdSec IP Analyse für: {self.ip}")
print(f"{Colors.BOLD}Analyse gestartet um: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{Colors.END}\n")
# Run all checks
self.check_crowdsec_decisions()
self.check_crowdsec_alerts()
self.check_crowdsec_scenarios()
self.check_bouncers()
self.check_firewall_rules()
self.search_systemd_journal()
self.check_specific_services()
self.search_log_files()
self.search_vhost_logs()
# Show triggering log lines
self.show_triggering_log_lines()
# Print summary
self.print_summary()
# Prompt for unblocking
self.prompt_unblock()
def main():
parser = argparse.ArgumentParser(
description='Comprehensive CrowdSec IP blocking analysis (including VHost logs)',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
%(prog)s 78.43.28.75
%(prog)s 192.168.1.100
This script searches:
- CrowdSec decisions and alerts
- Firewall rules (iptables, nftables, ipset)
- System logs (/var/log/*)
- Service logs (journalctl)
- VHost logs (/var/www/vhosts/*/logs/) including .gz archives
'''
)
parser.add_argument('ip', help='IP address to analyze')
args = parser.parse_args()
# Check if running as root
if os.geteuid() != 0:
print(f"{Colors.RED}Warnung: Dieses Script sollte als root ausgeführt werden für vollständige Ergebnisse{Colors.END}")
print(f"{Colors.YELLOW}Manche Befehle könnten fehlschlagen.{Colors.END}\n")
analyzer = IPAnalyzer(args.ip)
analyzer.analyze()
if __name__ == '__main__':
main()