check-if-ip-banned.py aktualisiert
This commit is contained in:
@@ -13,7 +13,7 @@ import json
|
|||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import re
|
import re
|
||||||
from datetime import datetime
|
from datetime import datetime, timedelta
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import argparse
|
import argparse
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
@@ -714,14 +714,19 @@ class IPAnalyzer:
|
|||||||
# Critical findings
|
# Critical findings
|
||||||
critical_findings = []
|
critical_findings = []
|
||||||
|
|
||||||
if self.results['crowdsec'].get('decisions'):
|
# Check if there are actual ban decisions (not whitelist)
|
||||||
critical_findings.append("CrowdSec hat aktive Decisions für diese IP")
|
has_ban_decisions = False
|
||||||
|
if self.results['crowdsec'].get('decision_details'):
|
||||||
|
has_ban_decisions = any(d.get('type') == 'ban' for d in self.results['crowdsec']['decision_details'])
|
||||||
|
|
||||||
|
if has_ban_decisions:
|
||||||
|
critical_findings.append("CrowdSec hat aktive ban-Decisions für diese IP")
|
||||||
|
|
||||||
if self.results['firewall']:
|
if self.results['firewall']:
|
||||||
critical_findings.append("IP ist in Firewall-Regeln vorhanden")
|
critical_findings.append("IP ist in Firewall-Regeln vorhanden")
|
||||||
|
|
||||||
if self.results['bouncers']:
|
if has_ban_decisions and self.results['bouncers']:
|
||||||
critical_findings.append("IP wird von Bouncers verarbeitet")
|
critical_findings.append("IP wird von Bouncers aktiv blockiert")
|
||||||
|
|
||||||
if critical_findings:
|
if critical_findings:
|
||||||
print(f"{Colors.RED}{Colors.BOLD}BLOCKIERUNGEN GEFUNDEN:{Colors.END}")
|
print(f"{Colors.RED}{Colors.BOLD}BLOCKIERUNGEN GEFUNDEN:{Colors.END}")
|
||||||
@@ -767,11 +772,16 @@ class IPAnalyzer:
|
|||||||
else:
|
else:
|
||||||
print(f"{Colors.GREEN}{Colors.BOLD}KEINE AKTIVEN BLOCKIERUNGEN GEFUNDEN{Colors.END}")
|
print(f"{Colors.GREEN}{Colors.BOLD}KEINE AKTIVEN BLOCKIERUNGEN GEFUNDEN{Colors.END}")
|
||||||
|
|
||||||
# Scenarios summary
|
# Scenarios summary - only show real ban scenarios, not whitelist entries
|
||||||
if self.results['crowdsec'].get('scenarios'):
|
if self.results['crowdsec'].get('scenarios'):
|
||||||
print(f"\n{Colors.YELLOW}{Colors.BOLD}WARUM WURDE GEBLOCKT:{Colors.END}")
|
# Filter out whitelist/ticket scenarios
|
||||||
for scenario, count in list(self.results['crowdsec']['scenarios'].items())[:5]:
|
real_scenarios = {k: v for k, v in self.results['crowdsec']['scenarios'].items()
|
||||||
self.print_finding('warning', f"{scenario} ({count}x)")
|
if not (k.startswith('Ticket') or 'Whitelist' in k or 'whitelist' in k)}
|
||||||
|
|
||||||
|
if real_scenarios:
|
||||||
|
print(f"\n{Colors.YELLOW}{Colors.BOLD}WARUM WURDE GEBLOCKT:{Colors.END}")
|
||||||
|
for scenario, count in list(real_scenarios.items())[:5]:
|
||||||
|
self.print_finding('warning', f"{scenario} ({count}x)")
|
||||||
|
|
||||||
# Log occurrences
|
# Log occurrences
|
||||||
total_log_entries = 0
|
total_log_entries = 0
|
||||||
@@ -803,8 +813,8 @@ class IPAnalyzer:
|
|||||||
# Recommendations
|
# Recommendations
|
||||||
print(f"\n{Colors.BOLD}{Colors.GREEN}EMPFEHLUNGEN:{Colors.END}")
|
print(f"\n{Colors.BOLD}{Colors.GREEN}EMPFEHLUNGEN:{Colors.END}")
|
||||||
|
|
||||||
if self.results['crowdsec'].get('decisions'):
|
if has_ban_decisions:
|
||||||
self.print_finding('info', "1. Lösche alle CrowdSec Decisions: cscli decisions delete --ip " + self.ip)
|
self.print_finding('info', "1. Nutze die AllowList-Funktion dieses Scripts (wird gleich gefragt)")
|
||||||
|
|
||||||
if self.results['firewall']:
|
if self.results['firewall']:
|
||||||
self.print_finding('info', "2. Prüfe Firewall-Regeln manuell und entferne IP falls nötig")
|
self.print_finding('info', "2. Prüfe Firewall-Regeln manuell und entferne IP falls nötig")
|
||||||
@@ -819,9 +829,14 @@ class IPAnalyzer:
|
|||||||
self.print_finding('info', "Nach Änderungen CrowdSec neu laden: systemctl reload crowdsec")
|
self.print_finding('info', "Nach Änderungen CrowdSec neu laden: systemctl reload crowdsec")
|
||||||
|
|
||||||
def prompt_unblock(self):
|
def prompt_unblock(self):
|
||||||
"""Prompt user if they want to unblock/whitelist the IP temporarily"""
|
"""Prompt user if they want to unblock/whitelist the IP temporarily using AllowLists"""
|
||||||
# Check if we should prompt
|
# Check if we should prompt
|
||||||
has_active_blocks = bool(self.results['crowdsec'].get('decisions') or self.results['firewall'])
|
# Check for actual ban decisions (not whitelist)
|
||||||
|
has_ban_decisions = False
|
||||||
|
if self.results['crowdsec'].get('decision_details'):
|
||||||
|
has_ban_decisions = any(d.get('type') == 'ban' for d in self.results['crowdsec']['decision_details'])
|
||||||
|
|
||||||
|
has_active_blocks = bool(has_ban_decisions or self.results['firewall'])
|
||||||
has_past_blocks = bool(self.results['crowdsec'].get('alerts') or
|
has_past_blocks = bool(self.results['crowdsec'].get('alerts') or
|
||||||
any('crowdsecurity/' in str(logs) for logs in self.results.get('logs', {}).values()))
|
any('crowdsecurity/' in str(logs) for logs in self.results.get('logs', {}).values()))
|
||||||
|
|
||||||
@@ -836,25 +851,29 @@ class IPAnalyzer:
|
|||||||
print(f"{Colors.BOLD}Möchtest du diese IP für 72 Stunden entsperren?{Colors.END}")
|
print(f"{Colors.BOLD}Möchtest du diese IP für 72 Stunden entsperren?{Colors.END}")
|
||||||
else:
|
else:
|
||||||
print(f"{Colors.BOLD}{Colors.YELLOW}ℹ️ IP war in der Vergangenheit blockiert (aktuell frei){Colors.END}")
|
print(f"{Colors.BOLD}{Colors.YELLOW}ℹ️ IP war in der Vergangenheit blockiert (aktuell frei){Colors.END}")
|
||||||
print(f"{Colors.BOLD}Möchtest du eine 72h Whitelist erstellen, um erneute Bans zu verhindern?{Colors.END}")
|
print(f"{Colors.BOLD}Möchtest du eine 72h AllowList erstellen, um erneute Bans zu verhindern?{Colors.END}")
|
||||||
|
|
||||||
print(f"{Colors.WHITE}Dies wird:{Colors.END}")
|
print(f"{Colors.WHITE}Dies wird:{Colors.END}")
|
||||||
if has_active_blocks:
|
if has_active_blocks:
|
||||||
print(f" 1. Alle aktuellen CrowdSec Decisions löschen")
|
print(f" 1. Alle aktuellen CrowdSec Decisions löschen")
|
||||||
print(f" 2. Eine temporäre Whitelist-Decision für 72h erstellen")
|
print(f" 2. IP zur AllowList 'temp-whitelist-from-check-if-ip-banned' hinzufügen (72h)")
|
||||||
print(f" 3. Die IP aus der Firewall entfernen (falls vorhanden)")
|
print(f" 3. Die IP aus der Firewall entfernen (falls vorhanden)")
|
||||||
else:
|
else:
|
||||||
print(f" 1. Eine temporäre Whitelist-Decision für 72h erstellen")
|
print(f" 1. IP zur AllowList 'temp-whitelist-from-check-if-ip-banned' hinzufügen (72h)")
|
||||||
print(f" 2. Verhindert, dass die IP in den nächsten 72h erneut gebannt wird")
|
print(f" 2. Verhindert, dass die IP in den nächsten 72h erneut gebannt wird")
|
||||||
|
print(f"{Colors.WHITE}Hinweis: AllowLists funktionieren sofort (kein Neustart nötig){Colors.END}")
|
||||||
print(f"{Colors.BOLD}{Colors.YELLOW}{'='*80}{Colors.END}")
|
print(f"{Colors.BOLD}{Colors.YELLOW}{'='*80}{Colors.END}")
|
||||||
|
|
||||||
response = input(f"{Colors.GREEN}Whitelist erstellen? [j/n]: {Colors.END}").strip().lower()
|
response = input(f"{Colors.GREEN}AllowList erstellen? [j/n]: {Colors.END}").strip().lower()
|
||||||
|
|
||||||
if response in ['j', 'ja', 'y', 'yes']:
|
if response in ['j', 'ja', 'y', 'yes']:
|
||||||
print(f"\n{Colors.CYAN}Erstelle Whitelist für IP {self.ip}...{Colors.END}\n")
|
print(f"\n{Colors.CYAN}Erstelle AllowList für IP {self.ip}...{Colors.END}\n")
|
||||||
|
|
||||||
|
allowlist_name = "temp-whitelist-from-check-if-ip-banned"
|
||||||
|
expiration_display = (datetime.now() + timedelta(hours=72)).strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
|
||||||
step = 1
|
step = 1
|
||||||
total_steps = 3 if has_active_blocks else 1
|
total_steps = 3 if has_active_blocks else 2
|
||||||
|
|
||||||
# Step 1: Delete all decisions if there are active blocks
|
# Step 1: Delete all decisions if there are active blocks
|
||||||
if has_active_blocks:
|
if has_active_blocks:
|
||||||
@@ -871,23 +890,52 @@ class IPAnalyzer:
|
|||||||
self.print_finding('warning', f"Fehler beim Löschen: {stderr}")
|
self.print_finding('warning', f"Fehler beim Löschen: {stderr}")
|
||||||
step += 1
|
step += 1
|
||||||
|
|
||||||
# Step 2: Add whitelist decision for 72h
|
# Step 2: Check if AllowList exists, create if not
|
||||||
self.print_finding('info', f"Schritt {step}/{total_steps}: Erstelle 72h Whitelist-Decision...")
|
self.print_finding('info', f"Schritt {step}/{total_steps}: Prüfe AllowList '{allowlist_name}'...")
|
||||||
|
stdout, stderr, code = self.run_command(['cscli', 'allowlist', 'list', '-o', 'json'])
|
||||||
|
|
||||||
|
allowlist_exists = False
|
||||||
|
if code == 0 and stdout:
|
||||||
|
try:
|
||||||
|
allowlists = json.loads(stdout)
|
||||||
|
allowlist_exists = any(al.get('name') == allowlist_name for al in allowlists)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if not allowlist_exists:
|
||||||
|
self.print_finding('info', f"AllowList existiert nicht, erstelle '{allowlist_name}'...")
|
||||||
|
stdout, stderr, code = self.run_command([
|
||||||
|
'cscli', 'allowlist', 'create', allowlist_name,
|
||||||
|
'-d', 'Temporäre AllowList für entsperrte IPs (72h)'
|
||||||
|
])
|
||||||
|
|
||||||
|
if code == 0:
|
||||||
|
self.print_finding('info', "AllowList erfolgreich erstellt")
|
||||||
|
else:
|
||||||
|
self.print_finding('warning', f"Fehler beim Erstellen der AllowList: {stderr}")
|
||||||
|
else:
|
||||||
|
self.print_finding('info', "AllowList existiert bereits")
|
||||||
|
|
||||||
|
step += 1
|
||||||
|
|
||||||
|
# Step 3: Add IP to AllowList with 72h expiration
|
||||||
|
self.print_finding('info', f"Schritt {step}/{total_steps}: Füge IP zur AllowList hinzu (72h)...")
|
||||||
|
|
||||||
|
# CRITICAL: Use duration format '72h', NOT absolute timestamp
|
||||||
stdout, stderr, code = self.run_command([
|
stdout, stderr, code = self.run_command([
|
||||||
'cscli', 'decisions', 'add',
|
'cscli', 'allowlist', 'add', allowlist_name, self.ip,
|
||||||
'--ip', self.ip,
|
'--expiration', '72h',
|
||||||
'--type', 'whitelist',
|
'--comment', f'Via Analyzer-Script am {datetime.now().strftime("%Y-%m-%d %H:%M")}'
|
||||||
'--duration', '72h',
|
|
||||||
'--reason', 'Temporäre Whitelist via Analyzer-Script'
|
|
||||||
])
|
])
|
||||||
|
|
||||||
if code == 0:
|
if code == 0:
|
||||||
self.print_finding('info', "Whitelist-Decision erfolgreich erstellt")
|
self.print_finding('info', "IP erfolgreich zur AllowList hinzugefügt")
|
||||||
else:
|
else:
|
||||||
self.print_finding('warning', f"Fehler beim Erstellen der Whitelist: {stderr}")
|
self.print_finding('warning', f"Fehler beim Hinzufügen zur AllowList: {stderr}")
|
||||||
|
|
||||||
step += 1
|
step += 1
|
||||||
|
|
||||||
# Step 3: Try to remove from ipset if present
|
# Step 4: Try to remove from ipset if present
|
||||||
if has_active_blocks and self.results['firewall'].get('ipset'):
|
if has_active_blocks and self.results['firewall'].get('ipset'):
|
||||||
self.print_finding('info', f"Schritt {step}/{total_steps}: Entferne IP aus ipset...")
|
self.print_finding('info', f"Schritt {step}/{total_steps}: Entferne IP aus ipset...")
|
||||||
|
|
||||||
@@ -911,20 +959,19 @@ class IPAnalyzer:
|
|||||||
# Final summary
|
# Final summary
|
||||||
print(f"\n{Colors.GREEN}{Colors.BOLD}✓ Fertig!{Colors.END}")
|
print(f"\n{Colors.GREEN}{Colors.BOLD}✓ Fertig!{Colors.END}")
|
||||||
if has_active_blocks:
|
if has_active_blocks:
|
||||||
print(f"{Colors.GREEN}Die IP {self.ip} wurde entsperrt und ist jetzt für 72 Stunden gewhitelistet.{Colors.END}")
|
print(f"{Colors.GREEN}Die IP {self.ip} wurde entsperrt und zur AllowList hinzugefügt.{Colors.END}")
|
||||||
else:
|
else:
|
||||||
print(f"{Colors.GREEN}Die IP {self.ip} ist jetzt für 72 Stunden gewhitelistet.{Colors.END}")
|
print(f"{Colors.GREEN}Die IP {self.ip} wurde zur AllowList hinzugefügt.{Colors.END}")
|
||||||
print(f"{Colors.YELLOW}Hinweis: Die Whitelist läuft automatisch nach 72h ab.{Colors.END}")
|
print(f"{Colors.YELLOW}Hinweis: Der Eintrag läuft automatisch nach 72h ab (ca. {expiration_display}).{Colors.END}")
|
||||||
|
|
||||||
# Verify
|
# Verify
|
||||||
print(f"\n{Colors.CYAN}Überprüfe neuen Status...{Colors.END}")
|
print(f"\n{Colors.CYAN}Überprüfe AllowList-Status...{Colors.END}")
|
||||||
stdout, stderr, code = self.run_command(['cscli', 'decisions', 'list', '--ip', self.ip])
|
stdout, stderr, code = self.run_command(['cscli', 'allowlist', 'inspect', allowlist_name])
|
||||||
if code == 0 and stdout:
|
if code == 0 and stdout:
|
||||||
lines = [l for l in stdout.split('\n') if self.ip in l and not l.startswith('╭') and not l.startswith('│ ID')]
|
# Show only the relevant lines with our IP
|
||||||
if lines:
|
for line in stdout.split('\n'):
|
||||||
for line in lines:
|
if self.ip in line or 'Value' in line or '────' in line:
|
||||||
if 'whitelist' in line:
|
print(f"{Colors.WHITE}{line}{Colors.END}")
|
||||||
self.print_finding('info', line)
|
|
||||||
else:
|
else:
|
||||||
print(f"\n{Colors.YELLOW}Abgebrochen. Keine Änderungen vorgenommen.{Colors.END}")
|
print(f"\n{Colors.YELLOW}Abgebrochen. Keine Änderungen vorgenommen.{Colors.END}")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user