diff --git a/check-if-ip-banned.py b/check-if-ip-banned.py new file mode 100644 index 0000000..64dfa9a --- /dev/null +++ b/check-if-ip-banned.py @@ -0,0 +1,987 @@ +#!/usr/bin/env python3 +""" +CrowdSec IP Analyzer - Comprehensive IP blocking analysis +Searches all logs, configs, services AND vhost logs (including .gz archives) + +Optional dependency for better timestamp parsing: + pip install python-dateutil --break-system-packages + (Script works without it, but timestamps will be less formatted) +""" + +import subprocess +import json +import os +import sys +import re +from datetime import datetime +from pathlib import Path +import argparse +from collections import defaultdict +from multiprocessing import Pool, cpu_count +from functools import partial + +# Try to import dateutil for better timestamp parsing, but continue without it +try: + from dateutil import parser as date_parser + HAS_DATEUTIL = True +except ImportError: + HAS_DATEUTIL = False + +class Colors: + RED = '\033[91m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + BLUE = '\033[94m' + MAGENTA = '\033[95m' + CYAN = '\033[96m' + WHITE = '\033[97m' + BOLD = '\033[1m' + END = '\033[0m' + +def search_single_file(args): + """Worker function to search a single file for an IP address""" + filepath, ip_address, is_vhost = args + + try: + # Determine if it's a compressed file + if filepath.endswith('.gz'): + cmd = f'zgrep -i "{ip_address}" "{filepath}" 2>/dev/null | head -n 100' + else: + cmd = f'grep -i "{ip_address}" "{filepath}" 2>/dev/null | head -n 100' + + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) + + if result.returncode == 0 and result.stdout: + lines = result.stdout.strip().split('\n') + return (filepath, lines) + else: + return None + except Exception: + return None + +class IPAnalyzer: + def __init__(self, ip_address): + self.ip = ip_address + self.results = { + 'crowdsec': {}, + 'bouncers': {}, + 'firewall': {}, + 'logs': {}, + 'services': {}, + 'vhosts': {} + } + # Calculate worker count: total CPUs - 6, minimum 1 + total_cpus = cpu_count() + self.worker_count = max(1, total_cpus - 6) + print(f"{Colors.CYAN}Verwende {self.worker_count} CPU-Cores für parallele Verarbeitung (von {total_cpus} verfügbar){Colors.END}") + + def print_header(self, text): + print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*80}{Colors.END}") + print(f"{Colors.BOLD}{Colors.CYAN}{text}{Colors.END}") + print(f"{Colors.BOLD}{Colors.CYAN}{'='*80}{Colors.END}\n") + + def print_section(self, text): + print(f"\n{Colors.BOLD}{Colors.YELLOW}>>> {text}{Colors.END}") + + def print_finding(self, severity, text): + colors = { + 'critical': Colors.RED, + 'warning': Colors.YELLOW, + 'info': Colors.GREEN, + 'detail': Colors.WHITE + } + color = colors.get(severity, Colors.WHITE) + symbols = { + 'critical': '❌', + 'warning': '⚠️ ', + 'info': '✓', + 'detail': ' →' + } + symbol = symbols.get(severity, '•') + print(f"{color}{symbol} {text}{Colors.END}") + + def run_command(self, cmd, shell=False): + """Run command and return output""" + try: + if shell: + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) + else: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + return result.stdout, result.stderr, result.returncode + except subprocess.TimeoutExpired: + return "", "Command timeout", 1 + except Exception as e: + return "", str(e), 1 + + def check_crowdsec_decisions(self): + """Check CrowdSec decisions for this IP""" + self.print_section("CrowdSec Decisions") + + # Get human-readable output + stdout, stderr, code = self.run_command(['cscli', 'decisions', 'list', '--ip', self.ip]) + + if code == 0 and stdout: + lines = stdout.strip().split('\n') + # Filter for actual decision lines (not headers/separators) + decision_lines = [l for l in lines if self.ip in l and not l.startswith('╭') and not l.startswith('│ ID')] + + if decision_lines: + self.print_finding('critical', f"Gefunden: {len(decision_lines)} Decision(s)") + for line in decision_lines: + self.print_finding('detail', line) + self.results['crowdsec']['decisions'] = decision_lines + + # Get detailed JSON output with timestamps + stdout_json, _, code_json = self.run_command(['cscli', 'decisions', 'list', '--ip', self.ip, '-o', 'json']) + + if code_json == 0 and stdout_json: + try: + decisions = json.loads(stdout_json) + if decisions: + print(f"\n{Colors.BOLD}Zeitstempel der Decisions:{Colors.END}") + for decision in decisions: + decision_type = decision.get('type', 'unknown') + scenario = decision.get('scenario', 'unknown') + created_at = decision.get('created_at', '') + duration = decision.get('duration', '') + + # Parse and format timestamp + if created_at: + if HAS_DATEUTIL: + try: + dt = date_parser.parse(created_at) + created_str = dt.strftime('%Y-%m-%d %H:%M:%S') + except: + created_str = created_at + else: + # Fallback: just show the raw timestamp + created_str = created_at[:19] if len(created_at) >= 19 else created_at + else: + created_str = 'N/A' + + color = Colors.RED if decision_type == 'ban' else Colors.YELLOW if decision_type == 'whitelist' else Colors.WHITE + self.print_finding('detail', f"{color}[{decision_type}] {scenario} - Erstellt: {created_str} - Dauer: {duration}{Colors.END}") + + self.results['crowdsec']['decision_details'] = decisions + except json.JSONDecodeError: + pass + else: + self.print_finding('info', "Keine aktiven Decisions gefunden") + else: + self.print_finding('warning', f"Fehler beim Abrufen der Decisions: {stderr}") + + def check_crowdsec_alerts(self): + """Check CrowdSec alerts for this IP""" + self.print_section("CrowdSec Alerts (mit Zeitstempeln)") + + stdout, stderr, code = self.run_command(['cscli', 'alerts', 'list', '--ip', self.ip, '-l', '20']) + + if code == 0 and stdout: + lines = stdout.strip().split('\n') + alert_lines = [l for l in lines if self.ip in l or 'scenario' in l.lower()] + + if len(alert_lines) > 1: # More than just header + self.print_finding('warning', f"Gefunden: {len(alert_lines)-1} Alert(s)") + for line in alert_lines[:10]: # Show first 10 + self.print_finding('detail', line) + self.results['crowdsec']['alerts'] = alert_lines + + # Get JSON output for better timestamp parsing + stdout_json, _, code_json = self.run_command(['cscli', 'alerts', 'list', '--ip', self.ip, '-l', '20', '-o', 'json']) + + if code_json == 0 and stdout_json: + try: + alerts = json.loads(stdout_json) + if alerts: + print(f"\n{Colors.BOLD}Chronologische Alert-Übersicht:{Colors.END}") + # Sort by created_at + sorted_alerts = sorted(alerts, key=lambda x: x.get('created_at', ''), reverse=True) + + for alert in sorted_alerts[:10]: + scenario = alert.get('scenario', 'unknown') + created_at = alert.get('created_at', '') + decisions = alert.get('decisions', []) + decision_type = decisions[0].get('type', 'unknown') if decisions else 'no decision' + + # Parse timestamp + if created_at: + if HAS_DATEUTIL: + try: + dt = date_parser.parse(created_at) + created_str = dt.strftime('%Y-%m-%d %H:%M:%S') + except: + created_str = created_at + else: + created_str = created_at[:19] if len(created_at) >= 19 else created_at + else: + created_str = 'N/A' + + color = Colors.RED if decision_type == 'ban' else Colors.YELLOW if decision_type in ['whitelist', '(simul)ban'] else Colors.WHITE + self.print_finding('detail', f"{color}{created_str} - [{decision_type}] {scenario}{Colors.END}") + except json.JSONDecodeError: + pass + else: + self.print_finding('info', "Keine Alerts gefunden") + else: + self.print_finding('warning', f"Fehler beim Abrufen der Alerts: {stderr}") + + def check_bouncers(self): + """Check all registered bouncers""" + self.print_section("CrowdSec Bouncers") + + stdout, stderr, code = self.run_command(['cscli', 'bouncers', 'list']) + + if code == 0 and stdout: + print(stdout) + self.results['bouncers']['registered'] = stdout + + # Try to get bouncer metrics + stdout2, _, code2 = self.run_command(['cscli', 'metrics', 'show', 'bouncers']) + if code2 == 0 and stdout2: + print(stdout2) + self.results['bouncers']['metrics'] = stdout2 + else: + self.print_finding('warning', f"Fehler beim Abrufen der Bouncers: {stderr}") + + def check_firewall_rules(self): + """Check firewall rules (iptables, nftables)""" + self.print_section("Firewall Regeln") + + # Check iptables + for table in ['filter', 'nat', 'mangle']: + stdout, _, code = self.run_command(['iptables', '-t', table, '-L', '-n', '-v']) + if code == 0 and self.ip in stdout: + self.print_finding('critical', f"IP gefunden in iptables table '{table}':") + for line in stdout.split('\n'): + if self.ip in line: + self.print_finding('detail', line) + self.results['firewall'][f'iptables_{table}'] = [l for l in stdout.split('\n') if self.ip in l] + + # Check ip6tables + for table in ['filter', 'nat', 'mangle']: + stdout, _, code = self.run_command(['ip6tables', '-t', table, '-L', '-n', '-v']) + if code == 0 and self.ip in stdout: + self.print_finding('critical', f"IP gefunden in ip6tables table '{table}':") + for line in stdout.split('\n'): + if self.ip in line: + self.print_finding('detail', line) + + # Check nftables + stdout, _, code = self.run_command(['nft', 'list', 'ruleset']) + if code == 0 and self.ip in stdout: + self.print_finding('critical', "IP gefunden in nftables:") + for line in stdout.split('\n'): + if self.ip in line: + self.print_finding('detail', line) + self.results['firewall']['nftables'] = [l for l in stdout.split('\n') if self.ip in l] + + # Check ipset with timeout information + stdout, _, code = self.run_command(['ipset', 'list']) + if code == 0 and self.ip in stdout: + self.print_finding('critical', "IP gefunden in ipset:") + + # Parse ipset output to find the IP and its timeout + current_set = None + for line in stdout.split('\n'): + if line.startswith('Name:'): + current_set = line.split('Name:')[1].strip() + elif self.ip in line: + self.print_finding('detail', f"Set: {current_set}") + self.print_finding('detail', line) + + # Extract timeout if present + if 'timeout' in line: + match = re.search(r'timeout (\d+)', line) + if match: + timeout_seconds = int(match.group(1)) + timeout_minutes = timeout_seconds // 60 + timeout_hours = timeout_minutes // 60 + remaining_minutes = timeout_minutes % 60 + + # Get decision creation time to calculate when it was added + if self.results['crowdsec'].get('decision_details'): + for decision in self.results['crowdsec']['decision_details']: + created_at = decision.get('created_at', '') + duration = decision.get('duration', '') + + if created_at: + if HAS_DATEUTIL: + try: + import datetime as dt + created_dt = date_parser.parse(created_at) + created_str = created_dt.strftime('%Y-%m-%d %H:%M:%S') + + self.print_finding('warning', + f"Hinzugefügt um: {created_str} (läuft ab in {timeout_hours}h {remaining_minutes}m)") + except: + self.print_finding('warning', + f"Verbleibende Zeit: {timeout_hours}h {remaining_minutes}m ({timeout_seconds}s)") + else: + created_str = created_at[:19] if len(created_at) >= 19 else created_at + self.print_finding('warning', + f"Hinzugefügt um: {created_str} (läuft ab in {timeout_hours}h {remaining_minutes}m)") + else: + self.print_finding('warning', + f"Verbleibende Zeit: {timeout_hours}h {remaining_minutes}m ({timeout_seconds}s)") + else: + self.print_finding('warning', + f"Verbleibende Zeit: {timeout_hours}h {remaining_minutes}m ({timeout_seconds}s)") + + self.results['firewall']['ipset'] = [l for l in stdout.split('\n') if self.ip in l] + + if not self.results['firewall']: + self.print_finding('info', "IP nicht in Firewall-Regeln gefunden") + + def search_systemd_journal(self): + """Search systemd journal for IP""" + self.print_section("Systemd Journal") + + # Search all journal entries + stdout, _, code = self.run_command( + f'journalctl --no-pager -n 1000 | grep -i "{self.ip}"', + shell=True + ) + + if code == 0 and stdout: + lines = stdout.strip().split('\n') + self.print_finding('warning', f"IP gefunden in {len(lines)} Journal-Einträgen") + + # Show first 20 lines + for line in lines[:20]: + self.print_finding('detail', line[:150]) + + if len(lines) > 20: + self.print_finding('info', f"... und {len(lines)-20} weitere Einträge") + + self.results['logs']['journalctl'] = lines + else: + self.print_finding('info', "IP nicht im Journal gefunden") + + def search_log_files(self): + """Search all log files in /var/log using parallel processing""" + self.print_section("System Log-Dateien durchsuchen (parallel)") + + log_dirs = ['/var/log'] + files_to_search = [] + + for log_dir in log_dirs: + if not os.path.exists(log_dir): + continue + + # Find all log files + for root, dirs, files in os.walk(log_dir): + # Skip some directories + dirs[:] = [d for d in dirs if d not in ['journal', 'private']] + + for file in files: + filepath = os.path.join(root, file) + + # Skip binary files and very large files + try: + if os.path.getsize(filepath) > 100 * 1024 * 1024: # Skip files > 100MB + continue + except: + continue + + files_to_search.append((filepath, self.ip, False)) + + if not files_to_search: + self.print_finding('info', "Keine Log-Dateien zum Durchsuchen gefunden") + return + + self.print_finding('info', f"Durchsuche {len(files_to_search)} System-Log-Dateien parallel...") + + # Use multiprocessing pool + found_in_files = {} + with Pool(processes=self.worker_count) as pool: + results = pool.map(search_single_file, files_to_search) + + # Process results + for result in results: + if result: + filepath, lines = result + found_in_files[filepath] = lines + self.print_finding('warning', f"Gefunden in: {filepath} ({len(lines)} Zeilen)") + + # Show first 3 lines + for line in lines[:3]: + self.print_finding('detail', line[:150]) + + if found_in_files: + self.results['logs']['files'] = found_in_files + else: + self.print_finding('info', "IP nicht in Log-Dateien gefunden") + + def check_specific_services(self): + """Check specific service logs""" + self.print_section("Service-spezifische Prüfungen") + + services_to_check = [ + ('crowdsec', 'CrowdSec Service'), + ('crowdsec-firewall-bouncer', 'CrowdSec Firewall Bouncer'), + ('nginx', 'Nginx'), + ('apache2', 'Apache'), + ('dovecot', 'Dovecot'), + ('postfix', 'Postfix'), + ('fail2ban', 'Fail2ban') + ] + + for service, name in services_to_check: + stdout, _, code = self.run_command( + f'journalctl -u {service} --no-pager -n 500 | grep -i "{self.ip}"', + shell=True + ) + + if code == 0 and stdout: + lines = stdout.strip().split('\n') + self.print_finding('warning', f"{name}: {len(lines)} Einträge gefunden") + for line in lines[:5]: + self.print_finding('detail', line[:150]) + self.results['services'][service] = lines + + def check_crowdsec_scenarios(self): + """Check which scenarios triggered for this IP""" + self.print_section("Ausgelöste Szenarien") + + stdout, _, code = self.run_command( + f'cscli alerts list --ip {self.ip} -o json', + shell=False + ) + + scenarios = {} + + if code == 0 and stdout: + try: + alerts = json.loads(stdout) + if alerts: + for alert in alerts: + scenario = alert.get('scenario', 'unknown') + count = alert.get('events_count', 1) + if scenario in scenarios: + scenarios[scenario] += count + else: + scenarios[scenario] = count + + self.results['crowdsec']['alert_details'] = alerts + except json.JSONDecodeError: + pass + + # Also check decisions for scenarios if we don't have alerts + if not scenarios and self.results['crowdsec'].get('decision_details'): + for decision in self.results['crowdsec']['decision_details']: + scenario = decision.get('scenario', 'unknown') + if scenario and scenario not in scenarios: + scenarios[scenario] = 1 + + if scenarios: + self.print_finding('warning', f"{len(scenarios)} verschiedene Szenarien:") + for scenario, count in sorted(scenarios.items(), key=lambda x: x[1], reverse=True): + self.print_finding('detail', f"{scenario}: {count} Events") + + self.results['crowdsec']['scenarios'] = scenarios + else: + self.print_finding('info', "Keine Szenarien gefunden") + + def show_triggering_log_lines(self): + """Show the actual log lines that triggered the bans""" + if not self.results['crowdsec'].get('scenarios'): + return + + self.print_section("Auslösende Log-Zeilen (Was führte zum Ban?)") + + # Map scenarios to log file patterns and search terms + scenario_patterns = { + 'crowdsecurity/postfix-relay-denied': { + 'log_files': ['/var/log/maillog', '/var/log/mail.log'], + 'patterns': ['Relay access denied', 'relay denied'], + 'description': 'Postfix Relay-Versuche' + }, + 'crowdsecurity/postfix-non-smtp-command': { + 'log_files': ['/var/log/maillog', '/var/log/mail.log'], + 'patterns': ['non-SMTP command', 'Non-SMTP command'], + 'description': 'Ungültige SMTP-Befehle' + }, + 'crowdsecurity/dovecot-spam': { + 'log_files': ['/var/log/maillog', '/var/log/mail.log'], + 'patterns': ['auth failed', 'authentication failed', 'Login aborted'], + 'description': 'Dovecot Auth-Failures' + }, + 'crowdsecurity/http-probing': { + 'log_files': ['/var/log/nginx/access.log', '/var/log/apache2/access.log'], + 'patterns': ['404', '403'], + 'description': 'HTTP Probing/Scanning' + }, + 'crowdsecurity/http-backdoors-attempts': { + 'log_files': ['/var/log/nginx/access.log', '/var/log/apache2/access.log'], + 'patterns': ['php', 'admin', 'wp-'], + 'description': 'Backdoor-Versuche' + } + } + + for scenario in self.results['crowdsec']['scenarios'].keys(): + if scenario.startswith('Ticket'): # Skip whitelist "scenarios" + continue + + self.print_finding('warning', f"\n{Colors.BOLD}Szenario: {scenario}{Colors.END}") + + pattern_info = scenario_patterns.get(scenario) + + if pattern_info: + self.print_finding('info', f"Suche nach: {pattern_info['description']}") + + found_lines = [] + for log_file in pattern_info['log_files']: + # Search current log file + if os.path.exists(log_file): + for pattern in pattern_info['patterns']: + cmd = f'grep -i "{self.ip}" "{log_file}" 2>/dev/null | grep -i "{pattern}" | tail -n 20' + stdout, _, code = self.run_command(cmd, shell=True) + + if code == 0 and stdout: + lines = stdout.strip().split('\n') + found_lines.extend(lines) + + # Also search compressed logs (last 3 rotations) + for i in range(1, 4): + compressed_log = f"{log_file}.{i}.gz" + if os.path.exists(compressed_log): + for pattern in pattern_info['patterns']: + cmd = f'zgrep -i "{self.ip}" "{compressed_log}" 2>/dev/null | grep -i "{pattern}" | tail -n 20' + stdout, _, code = self.run_command(cmd, shell=True) + + if code == 0 and stdout: + lines = stdout.strip().split('\n') + found_lines.extend(lines) + + if found_lines: + # Remove duplicates + found_lines = list(dict.fromkeys(found_lines)) + + self.print_finding('critical', f"Gefunden: {len(found_lines)} relevante Log-Zeile(n)") + for i, line in enumerate(found_lines[:15], 1): # Show max 15 + self.print_finding('detail', f"{i}. {line[:200]}") + + if len(found_lines) > 15: + self.print_finding('info', f"... und {len(found_lines)-15} weitere Zeilen") + else: + self.print_finding('info', "Keine spezifischen Log-Zeilen gefunden (möglicherweise bereits rotiert)") + else: + # Generic search for unknown scenarios + self.print_finding('info', "Kein spezifisches Such-Pattern für dieses Szenario definiert") + self.print_finding('info', f"Durchsuche maillog nach IP {self.ip}...") + + # Try current and compressed logs + found = False + for log_path in ['/var/log/maillog', '/var/log/mail.log']: + if os.path.exists(log_path): + cmd = f'grep -i "{self.ip}" "{log_path}" 2>/dev/null | tail -n 10' + stdout, _, code = self.run_command(cmd, shell=True) + + if code == 0 and stdout: + lines = stdout.strip().split('\n') + self.print_finding('warning', f"Letzte {len(lines)} Zeilen mit dieser IP:") + for line in lines: + self.print_finding('detail', line[:200]) + found = True + break + + if not found: + self.print_finding('info', "Keine Log-Zeilen gefunden") + + def search_vhost_logs(self): + """Search all vhost logs including compressed archives using parallel processing""" + self.print_section("VHost Logs durchsuchen (parallel, inkl. .gz Archive)") + + vhosts_path = '/var/www/vhosts' + + if not os.path.exists(vhosts_path): + self.print_finding('info', f"VHosts Pfad nicht gefunden: {vhosts_path}") + return + + # Get all vhosts + try: + vhosts = [d for d in os.listdir(vhosts_path) + if os.path.isdir(os.path.join(vhosts_path, d)) + and d not in ['chroot', 'default', 'system']] + except Exception as e: + self.print_finding('warning', f"Fehler beim Lesen von {vhosts_path}: {e}") + return + + self.print_finding('info', f"Durchsuche {len(vhosts)} VHosts parallel...") + + # Collect all log files to search + files_to_search = [] + vhost_file_mapping = {} # Map filepath to (vhost, log_file) + + for vhost in sorted(vhosts): + logs_dir = os.path.join(vhosts_path, vhost, 'logs') + + if not os.path.exists(logs_dir): + continue + + # Get all log files in this vhost + try: + log_files = os.listdir(logs_dir) + except Exception: + continue + + # Add each log file to search list + for log_file in log_files: + log_path = os.path.join(logs_dir, log_file) + + # Skip if not a file + if not os.path.isfile(log_path): + continue + + files_to_search.append((log_path, self.ip, True)) + vhost_file_mapping[log_path] = (vhost, log_file) + + if not files_to_search: + self.print_finding('info', "Keine VHost-Log-Dateien zum Durchsuchen gefunden") + return + + self.print_finding('info', f"Durchsuche {len(files_to_search)} VHost-Log-Dateien...") + + # Use multiprocessing pool + vhost_stats = defaultdict(lambda: {'total': 0, 'by_log_type': defaultdict(int), 'sample_lines': []}) + + with Pool(processes=self.worker_count) as pool: + results = pool.map(search_single_file, files_to_search) + + # Process results + for result in results: + if result: + filepath, lines = result + count = len(lines) + + # Get vhost and log_file info + if filepath in vhost_file_mapping: + vhost, log_file = vhost_file_mapping[filepath] + + # Determine log type + log_type = 'other' + if 'access' in log_file: + log_type = 'access' + elif 'error' in log_file: + log_type = 'error' + elif 'php-fpm' in log_file: + log_type = 'php-fpm' + elif 'proxy' in log_file: + log_type = 'proxy' + elif 'xferlog' in log_file: + log_type = 'xferlog' + + vhost_stats[vhost]['total'] += count + vhost_stats[vhost]['by_log_type'][log_type] += count + + # Save first 3 lines as sample + if len(vhost_stats[vhost]['sample_lines']) < 3: + for line in lines[:3]: + if len(vhost_stats[vhost]['sample_lines']) < 3: + vhost_stats[vhost]['sample_lines'].append((log_file, line)) + + # Display results + if vhost_stats: + self.results['vhosts'] = dict(vhost_stats) + + # Sort by total count + sorted_vhosts = sorted(vhost_stats.items(), key=lambda x: x[1]['total'], reverse=True) + + self.print_finding('warning', f"IP in {len(sorted_vhosts)} VHost(s) gefunden:") + print() + + for vhost, stats in sorted_vhosts: + self.print_finding('warning', f"{Colors.BOLD}{vhost}{Colors.END}: {stats['total']} Einträge") + + # Show breakdown by log type + for log_type, count in sorted(stats['by_log_type'].items(), key=lambda x: x[1], reverse=True): + self.print_finding('detail', f" {log_type}: {count}") + + # Show sample lines + if stats['sample_lines']: + self.print_finding('detail', f" Beispiel-Zeilen:") + for log_file, line in stats['sample_lines']: + short_line = line[:120] + '...' if len(line) > 120 else line + self.print_finding('detail', f" [{log_file}] {short_line}") + print() + else: + self.print_finding('info', "IP nicht in VHost-Logs gefunden") + + def print_summary(self): + """Print summary of findings""" + self.print_header("ZUSAMMENFASSUNG") + + # Critical findings + critical_findings = [] + + if self.results['crowdsec'].get('decisions'): + critical_findings.append("CrowdSec hat aktive Decisions für diese IP") + + if self.results['firewall']: + critical_findings.append("IP ist in Firewall-Regeln vorhanden") + + if self.results['bouncers']: + critical_findings.append("IP wird von Bouncers verarbeitet") + + if critical_findings: + print(f"{Colors.RED}{Colors.BOLD}BLOCKIERUNGEN GEFUNDEN:{Colors.END}") + for finding in critical_findings: + self.print_finding('critical', finding) + + # Show when IP was first blocked + if self.results['crowdsec'].get('decision_details'): + print(f"\n{Colors.YELLOW}{Colors.BOLD}ZEITLICHE ÜBERSICHT:{Colors.END}") + + # Find earliest ban decision + ban_decisions = [d for d in self.results['crowdsec']['decision_details'] + if d.get('type') == 'ban'] + + if ban_decisions: + # Sort by created_at + sorted_bans = sorted(ban_decisions, key=lambda x: x.get('created_at', '')) + earliest_ban = sorted_bans[0] + + created_at = earliest_ban.get('created_at', '') + scenario = earliest_ban.get('scenario', 'unknown') + + if created_at: + if HAS_DATEUTIL: + try: + import datetime as dt + created_dt = date_parser.parse(created_at) + now = dt.datetime.now(created_dt.tzinfo) + time_diff = now - created_dt + + hours = time_diff.seconds // 3600 + minutes = (time_diff.seconds % 3600) // 60 + + created_str = created_dt.strftime('%Y-%m-%d %H:%M:%S') + self.print_finding('warning', f"Erste Sperrung: {created_str} (vor {hours}h {minutes}m)") + self.print_finding('warning', f"Grund: {scenario}") + except: + pass + else: + created_str = created_at[:19] if len(created_at) >= 19 else created_at + self.print_finding('warning', f"Erste Sperrung: {created_str}") + self.print_finding('warning', f"Grund: {scenario}") + else: + print(f"{Colors.GREEN}{Colors.BOLD}KEINE AKTIVEN BLOCKIERUNGEN GEFUNDEN{Colors.END}") + + # Scenarios summary + if self.results['crowdsec'].get('scenarios'): + print(f"\n{Colors.YELLOW}{Colors.BOLD}WARUM WURDE GEBLOCKT:{Colors.END}") + for scenario, count in list(self.results['crowdsec']['scenarios'].items())[:5]: + self.print_finding('warning', f"{scenario} ({count}x)") + + # Log occurrences + total_log_entries = 0 + if self.results['logs'].get('journalctl'): + total_log_entries += len(self.results['logs']['journalctl']) + if self.results['logs'].get('files'): + for lines in self.results['logs']['files'].values(): + total_log_entries += len(lines) + + # VHost statistics + if self.results.get('vhosts'): + total_vhost_entries = sum(stats['total'] for stats in self.results['vhosts'].values()) + most_accessed_vhost = max(self.results['vhosts'].items(), key=lambda x: x[1]['total']) + + print(f"\n{Colors.CYAN}{Colors.BOLD}VHOST STATISTIK:{Colors.END}") + self.print_finding('info', f"Insgesamt {total_vhost_entries} Einträge in {len(self.results['vhosts'])} VHost(s)") + self.print_finding('warning', f"Am häufigsten: {most_accessed_vhost[0]} ({most_accessed_vhost[1]['total']} Einträge)") + + # Show top 5 vhosts + sorted_vhosts = sorted(self.results['vhosts'].items(), key=lambda x: x[1]['total'], reverse=True) + print(f"\n{Colors.BOLD}Top 5 VHosts:{Colors.END}") + for i, (vhost, stats) in enumerate(sorted_vhosts[:5], 1): + log_types = ', '.join([f"{lt}:{c}" for lt, c in sorted(stats['by_log_type'].items(), key=lambda x: x[1], reverse=True)][:3]) + self.print_finding('detail', f"{i}. {vhost}: {stats['total']} ({log_types})") + + if total_log_entries > 0: + print(f"\n{Colors.CYAN}System-Logs: {total_log_entries} Einträge{Colors.END}") + + # Recommendations + print(f"\n{Colors.BOLD}{Colors.GREEN}EMPFEHLUNGEN:{Colors.END}") + + if self.results['crowdsec'].get('decisions'): + self.print_finding('info', "1. Lösche alle CrowdSec Decisions: cscli decisions delete --ip " + self.ip) + + if self.results['firewall']: + self.print_finding('info', "2. Prüfe Firewall-Regeln manuell und entferne IP falls nötig") + + if not critical_findings: + self.print_finding('info', "Die IP ist nicht blockiert. Falls der Kunde Probleme hat:") + self.print_finding('info', " - Überprüfe E-Mail Client Konfiguration (Passwörter, Ports)") + self.print_finding('info', " - Prüfe auf TLS/SSL Version Probleme") + self.print_finding('info', " - Kontrolliere Rate-Limits in Dovecot/Postfix") + + self.print_finding('info', "Für dauerhafte Whitelist: IP in /etc/crowdsec/parsers/s02-enrich/jtl_whitelist.yaml eintragen") + self.print_finding('info', "Nach Änderungen CrowdSec neu laden: systemctl reload crowdsec") + + def prompt_unblock(self): + """Prompt user if they want to unblock/whitelist the IP temporarily""" + # Check if we should prompt + has_active_blocks = bool(self.results['crowdsec'].get('decisions') or self.results['firewall']) + has_past_blocks = bool(self.results['crowdsec'].get('alerts') or + any('crowdsecurity/' in str(logs) for logs in self.results.get('logs', {}).values())) + + if not has_active_blocks and not has_past_blocks: + # No blocks found at all, no need to ask + return + + print(f"\n{Colors.BOLD}{Colors.YELLOW}{'='*80}{Colors.END}") + + if has_active_blocks: + print(f"{Colors.BOLD}{Colors.RED}⚠️ IP ist AKTUELL blockiert!{Colors.END}") + print(f"{Colors.BOLD}Möchtest du diese IP für 72 Stunden entsperren?{Colors.END}") + else: + print(f"{Colors.BOLD}{Colors.YELLOW}ℹ️ IP war in der Vergangenheit blockiert (aktuell frei){Colors.END}") + print(f"{Colors.BOLD}Möchtest du eine 72h Whitelist erstellen, um erneute Bans zu verhindern?{Colors.END}") + + print(f"{Colors.WHITE}Dies wird:{Colors.END}") + if has_active_blocks: + print(f" 1. Alle aktuellen CrowdSec Decisions löschen") + print(f" 2. Eine temporäre Whitelist-Decision für 72h erstellen") + print(f" 3. Die IP aus der Firewall entfernen (falls vorhanden)") + else: + print(f" 1. Eine temporäre Whitelist-Decision für 72h erstellen") + print(f" 2. Verhindert, dass die IP in den nächsten 72h erneut gebannt wird") + print(f"{Colors.BOLD}{Colors.YELLOW}{'='*80}{Colors.END}") + + response = input(f"{Colors.GREEN}Whitelist erstellen? [j/n]: {Colors.END}").strip().lower() + + if response in ['j', 'ja', 'y', 'yes']: + print(f"\n{Colors.CYAN}Erstelle Whitelist für IP {self.ip}...{Colors.END}\n") + + step = 1 + total_steps = 3 if has_active_blocks else 1 + + # Step 1: Delete all decisions if there are active blocks + if has_active_blocks: + self.print_finding('info', f"Schritt {step}/{total_steps}: Lösche alle Decisions...") + stdout, stderr, code = self.run_command(['cscli', 'decisions', 'delete', '--ip', self.ip]) + + if code == 0: + # Parse how many were deleted + if "decision(s) deleted" in stdout: + self.print_finding('info', stdout.strip()) + else: + self.print_finding('info', "Decisions gelöscht") + else: + self.print_finding('warning', f"Fehler beim Löschen: {stderr}") + step += 1 + + # Step 2: Add whitelist decision for 72h + self.print_finding('info', f"Schritt {step}/{total_steps}: Erstelle 72h Whitelist-Decision...") + stdout, stderr, code = self.run_command([ + 'cscli', 'decisions', 'add', + '--ip', self.ip, + '--type', 'whitelist', + '--duration', '72h', + '--reason', 'Temporäre Whitelist via Analyzer-Script' + ]) + + if code == 0: + self.print_finding('info', "Whitelist-Decision erfolgreich erstellt") + else: + self.print_finding('warning', f"Fehler beim Erstellen der Whitelist: {stderr}") + step += 1 + + # Step 3: Try to remove from ipset if present + if has_active_blocks and self.results['firewall'].get('ipset'): + self.print_finding('info', f"Schritt {step}/{total_steps}: Entferne IP aus ipset...") + + # Try common crowdsec ipset names + removed = False + for i in range(5): + set_name = f'crowdsec-blacklists-{i}' + stdout, stderr, code = self.run_command(['ipset', 'test', set_name, self.ip]) + + if code == 0: # IP is in this set + stdout2, stderr2, code2 = self.run_command(['ipset', 'del', set_name, self.ip]) + if code2 == 0: + self.print_finding('info', f"IP aus {set_name} entfernt") + removed = True + else: + self.print_finding('warning', f"Fehler beim Entfernen aus {set_name}: {stderr2}") + + if not removed: + self.print_finding('info', "IP nicht in ipset gefunden oder bereits abgelaufen") + + # Final summary + print(f"\n{Colors.GREEN}{Colors.BOLD}✓ Fertig!{Colors.END}") + if has_active_blocks: + print(f"{Colors.GREEN}Die IP {self.ip} wurde entsperrt und ist jetzt für 72 Stunden gewhitelistet.{Colors.END}") + else: + print(f"{Colors.GREEN}Die IP {self.ip} ist jetzt für 72 Stunden gewhitelistet.{Colors.END}") + print(f"{Colors.YELLOW}Hinweis: Die Whitelist läuft automatisch nach 72h ab.{Colors.END}") + + # Verify + print(f"\n{Colors.CYAN}Überprüfe neuen Status...{Colors.END}") + stdout, stderr, code = self.run_command(['cscli', 'decisions', 'list', '--ip', self.ip]) + if code == 0 and stdout: + lines = [l for l in stdout.split('\n') if self.ip in l and not l.startswith('╭') and not l.startswith('│ ID')] + if lines: + for line in lines: + if 'whitelist' in line: + self.print_finding('info', line) + else: + print(f"\n{Colors.YELLOW}Abgebrochen. Keine Änderungen vorgenommen.{Colors.END}") + + def analyze(self): + """Run full analysis""" + self.print_header(f"CrowdSec IP Analyse für: {self.ip}") + + print(f"{Colors.BOLD}Analyse gestartet um: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{Colors.END}\n") + + # Run all checks + self.check_crowdsec_decisions() + self.check_crowdsec_alerts() + self.check_crowdsec_scenarios() + self.check_bouncers() + self.check_firewall_rules() + self.search_systemd_journal() + self.check_specific_services() + self.search_log_files() + self.search_vhost_logs() + + # Show triggering log lines + self.show_triggering_log_lines() + + # Print summary + self.print_summary() + + # Prompt for unblocking + self.prompt_unblock() + +def main(): + parser = argparse.ArgumentParser( + description='Comprehensive CrowdSec IP blocking analysis (including VHost logs)', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=''' +Examples: + %(prog)s 78.43.28.75 + %(prog)s 192.168.1.100 + +This script searches: +- CrowdSec decisions and alerts +- Firewall rules (iptables, nftables, ipset) +- System logs (/var/log/*) +- Service logs (journalctl) +- VHost logs (/var/www/vhosts/*/logs/) including .gz archives + ''' + ) + parser.add_argument('ip', help='IP address to analyze') + + args = parser.parse_args() + + # Check if running as root + if os.geteuid() != 0: + print(f"{Colors.RED}Warnung: Dieses Script sollte als root ausgeführt werden für vollständige Ergebnisse{Colors.END}") + print(f"{Colors.YELLOW}Manche Befehle könnten fehlschlagen.{Colors.END}\n") + + analyzer = IPAnalyzer(args.ip) + analyzer.analyze() + +if __name__ == '__main__': + main() \ No newline at end of file