Files
geoip_shop_manager/geoip_shop_manager.py

1146 lines
37 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
GeoIP Shop Blocker Manager - DACH Version
2-Component System: PHP blocking + Python watcher (systemd service)
Blocks all IPs outside Germany, Austria, and Switzerland (DACH region)
"""
import os
import sys
import shutil
import subprocess
import json
import time
from datetime import datetime, timedelta
from pathlib import Path
# Configuration
VHOSTS_DIR = "/var/www/vhosts"
BACKUP_SUFFIX = ".geoip_backup"
BLOCKING_FILE = "geoip_blocking.php"
CACHE_FILE = "dach_ip_ranges.cache"
LOG_FILE = "geoip_blocked.log"
CROWDSEC_QUEUE_FILE = "geoip_crowdsec_queue.log"
WATCHER_SCRIPT = "/usr/local/bin/geoip_crowdsec_watcher.py"
SYSTEMD_SERVICE = "/etc/systemd/system/geoip-crowdsec-watcher.service"
ACTIVE_SHOPS_FILE = "/var/lib/crowdsec/geoip_active_shops.json"
# PHP GeoIP blocking script (no exec, just logging)
GEOIP_SCRIPT = '''<?php
/**
* GeoIP Blocking Script - Blocks all non-DACH IPs
* DACH = Germany (DE), Austria (AT), Switzerland (CH)
* Logs blocked IPs for CrowdSec watcher to process
* Valid until: {expiry_date}
*/
// Auto-disable after 72 hours
$expiry_date = strtotime('{expiry_timestamp}');
if (time() > $expiry_date) {{
return; // Script expired, allow all traffic
}}
// Get visitor IP
$visitor_ip = $_SERVER['REMOTE_ADDR'] ?? '';
if (empty($visitor_ip)) {{
return;
}}
// Skip private IPs
if (filter_var($visitor_ip, FILTER_VALIDATE_IP, FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE) === false) {{
return;
}}
// Files
$cache_file = __DIR__ . '/{cache_file}';
$cache_duration = 86400; // 24 hours
$log_file = __DIR__ . '/{log_file}';
$crowdsec_queue = __DIR__ . '/{crowdsec_queue}';
// Function to download DACH IP ranges (Germany, Austria, Switzerland)
function download_dach_ranges() {{
$ranges = [];
$countries = ['de', 'at', 'ch']; // Germany, Austria, Switzerland
foreach ($countries as $country) {{
$url = "https://www.ipdeny.com/ipblocks/data/aggregated/$country-aggregated.zone";
$content = @file_get_contents($url);
if ($content !== false) {{
$lines = explode("\\n", trim($content));
foreach ($lines as $line) {{
$line = trim($line);
if (!empty($line) && strpos($line, '/') !== false) {{
$ranges[] = $line;
}}
}}
}}
}}
return $ranges;
}}
// Function to check if IP is in CIDR range
function ip_in_range($ip, $cidr) {{
list($subnet, $mask) = explode('/', $cidr);
$ip_long = ip2long($ip);
$subnet_long = ip2long($subnet);
$mask_long = -1 << (32 - (int)$mask);
return ($ip_long & $mask_long) == ($subnet_long & $mask_long);
}}
// Load or download IP ranges
$dach_ranges = [];
if (file_exists($cache_file) && (time() - filemtime($cache_file)) < $cache_duration) {{
$dach_ranges = unserialize(file_get_contents($cache_file));
}} else {{
$dach_ranges = download_dach_ranges();
if (!empty($dach_ranges)) {{
@file_put_contents($cache_file, serialize($dach_ranges));
}}
}}
// Check if visitor IP is from DACH region
$is_dach = false;
foreach ($dach_ranges as $range) {{
if (ip_in_range($visitor_ip, $range)) {{
$is_dach = true;
break;
}}
}}
// Block non-DACH IPs
if (!$is_dach) {{
$timestamp = date('Y-m-d H:i:s');
$user_agent = $_SERVER['HTTP_USER_AGENT'] ?? 'Unknown';
$request_uri = $_SERVER['REQUEST_URI'] ?? '/';
// Log for humans
$log_entry = "[$timestamp] IP: $visitor_ip | UA: $user_agent | URI: $request_uri\\n";
@file_put_contents($log_file, $log_entry, FILE_APPEND | LOCK_EX);
// Queue for CrowdSec (simple format)
$queue_entry = "$timestamp|$visitor_ip|{shop_name}\\n";
@file_put_contents($crowdsec_queue, $queue_entry, FILE_APPEND | LOCK_EX);
header('HTTP/1.1 403 Forbidden');
exit;
}}
'''
# Python watcher script (runs as systemd service)
WATCHER_SCRIPT_CONTENT = '''#!/usr/bin/env python3
"""
GeoIP CrowdSec Watcher Service
Monitors queue files and adds blocked IPs to CrowdSec
"""
import os
import sys
import time
import subprocess
import json
from pathlib import Path
from datetime import datetime
VHOSTS_DIR = "/var/www/vhosts"
QUEUE_FILE = "geoip_crowdsec_queue.log"
ACTIVE_SHOPS_FILE = "/var/lib/crowdsec/geoip_active_shops.json"
PROCESSED_IPS = {} # In-memory cache to avoid re-adding same IP
CHECK_INTERVAL = 5 # Check every 5 seconds
def log(msg):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}", flush=True)
def get_active_shops():
"""Get list of shops with active GeoIP blocking"""
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return {}
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
return json.load(f)
except:
return {}
def add_to_crowdsec(ip, shop):
"""Add IP to CrowdSec with 72h ban"""
# Check if already processed recently (within last hour)
now = time.time()
if ip in PROCESSED_IPS and (now - PROCESSED_IPS[ip]) < 3600:
return True
cmd = [
'cscli', 'decisions', 'add',
'--ip', ip,
'--duration', '72h',
'--type', 'ban',
'--reason', f'GeoIP: Non-DACH IP blocked by {shop}'
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
PROCESSED_IPS[ip] = now
log(f"✅ Added {ip} to CrowdSec (from {shop})")
return True
else:
log(f"⚠️ Failed to add {ip}: {result.stderr.strip()}")
return False
except Exception as e:
log(f"❌ Error adding {ip}: {e}")
return False
def process_queue_file(shop_path, shop):
"""Process queue file for a shop"""
queue_file = os.path.join(shop_path, 'httpdocs', QUEUE_FILE)
if not os.path.isfile(queue_file):
return 0
processed = 0
try:
# Read all lines
with open(queue_file, 'r') as f:
lines = f.readlines()
if not lines:
return 0
# Process each line
for line in lines:
line = line.strip()
if not line:
continue
try:
parts = line.split('|')
if len(parts) >= 2:
timestamp = parts[0]
ip = parts[1]
if add_to_crowdsec(ip, shop):
processed += 1
except:
continue
# Clear the file after processing
if processed > 0:
with open(queue_file, 'w') as f:
f.write('')
except Exception as e:
log(f"❌ Error processing {shop}: {e}")
return processed
def main():
log("🚀 GeoIP CrowdSec Watcher started (DACH mode)")
while True:
try:
active_shops = get_active_shops()
if not active_shops:
time.sleep(CHECK_INTERVAL)
continue
total_processed = 0
for shop, info in active_shops.items():
shop_path = os.path.join(VHOSTS_DIR, shop)
if os.path.isdir(shop_path):
count = process_queue_file(shop_path, shop)
total_processed += count
if total_processed > 0:
log(f"📊 Processed {total_processed} IPs in this cycle")
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
log("👋 Shutting down...")
break
except Exception as e:
log(f"❌ Error in main loop: {e}")
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main()
'''
# Systemd service file
SYSTEMD_SERVICE_CONTENT = '''[Unit]
Description=GeoIP CrowdSec Watcher Service (DACH)
After=network.target crowdsec.service
Wants=crowdsec.service
[Service]
Type=simple
ExecStart=/usr/bin/python3 /usr/local/bin/geoip_crowdsec_watcher.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
'''
def run_command(cmd, capture_output=True):
"""Run a shell command"""
try:
if capture_output:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
return result.returncode, result.stdout, result.stderr
else:
result = subprocess.run(cmd, shell=True, timeout=30)
return result.returncode, "", ""
except Exception as e:
return -1, "", str(e)
def check_crowdsec():
"""Check if CrowdSec is running"""
code, stdout, stderr = run_command("systemctl is-active crowdsec")
return code == 0 and stdout.strip() == "active"
def install_watcher_service():
"""Install the watcher script and systemd service"""
print(" 📦 Installiere CrowdSec-Watcher-Service...")
# Create watcher script
with open(WATCHER_SCRIPT, 'w') as f:
f.write(WATCHER_SCRIPT_CONTENT)
os.chmod(WATCHER_SCRIPT, 0o755)
print(f" ✅ Watcher-Script erstellt: {WATCHER_SCRIPT}")
# Create systemd service
with open(SYSTEMD_SERVICE, 'w') as f:
f.write(SYSTEMD_SERVICE_CONTENT)
print(f" ✅ Systemd-Service erstellt: {SYSTEMD_SERVICE}")
# Reload systemd and start service
run_command("systemctl daemon-reload")
run_command("systemctl enable geoip-crowdsec-watcher.service")
run_command("systemctl start geoip-crowdsec-watcher.service")
# Check if started
time.sleep(1)
code, stdout, _ = run_command("systemctl is-active geoip-crowdsec-watcher.service")
if code == 0 and stdout.strip() == "active":
print(" ✅ Service gestartet und läuft")
return True
else:
print(" ⚠️ Service konnte nicht gestartet werden")
return False
def uninstall_watcher_service():
"""Uninstall the watcher script and systemd service"""
print(" 📦 Deinstalliere CrowdSec-Watcher-Service...")
# Stop and disable service
run_command("systemctl stop geoip-crowdsec-watcher.service")
run_command("systemctl disable geoip-crowdsec-watcher.service")
# Remove files
if os.path.isfile(SYSTEMD_SERVICE):
os.remove(SYSTEMD_SERVICE)
print(f" 🗑️ Service-Datei gelöscht")
if os.path.isfile(WATCHER_SCRIPT):
os.remove(WATCHER_SCRIPT)
print(f" 🗑️ Watcher-Script gelöscht")
run_command("systemctl daemon-reload")
print(" ✅ Service deinstalliert")
def add_shop_to_active(shop):
"""Add shop to active shops tracking"""
os.makedirs(os.path.dirname(ACTIVE_SHOPS_FILE), exist_ok=True)
shops = {}
if os.path.isfile(ACTIVE_SHOPS_FILE):
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
shops[shop] = {
"activated": datetime.now().isoformat(),
"expiry": (datetime.now() + timedelta(hours=72)).isoformat()
}
with open(ACTIVE_SHOPS_FILE, 'w') as f:
json.dump(shops, f, indent=2)
def remove_shop_from_active(shop):
"""Remove shop from active shops tracking"""
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
if shop in shops:
del shops[shop]
with open(ACTIVE_SHOPS_FILE, 'w') as f:
json.dump(shops, f, indent=2)
def cleanup_crowdsec_decisions(shop):
"""Remove CrowdSec decisions for a shop"""
if not check_crowdsec():
return
print(f" 🔍 Entferne CrowdSec-Decisions für {shop}...")
# Use the same reliable method as manual cleanup
# This uses xargs to properly handle all IPs
cleanup_cmd = f"cscli decisions list -o raw | grep '{shop}' | cut -d',' -f3 | cut -d':' -f2 | xargs -I {{}} cscli decisions delete --ip {{}}"
code, stdout, stderr = run_command(cleanup_cmd)
# Count how many were removed by checking the output
if stdout:
# Count "deleted" messages
removed = stdout.count("decision(s) deleted")
if removed > 0:
print(f"{removed} Decisions entfernt")
else:
print(" Keine Decisions gefunden")
else:
print(" Keine Decisions gefunden")
def get_available_shops():
"""Get list of all shops"""
shops = []
if not os.path.exists(VHOSTS_DIR):
return shops
for entry in os.listdir(VHOSTS_DIR):
shop_path = os.path.join(VHOSTS_DIR, entry)
if os.path.isdir(shop_path) and entry not in ['chroot', 'system', 'default']:
httpdocs = os.path.join(shop_path, 'httpdocs')
if os.path.isdir(httpdocs):
index_php = os.path.join(httpdocs, 'index.php')
if os.path.isfile(index_php):
shops.append(entry)
return sorted(shops)
def get_active_shops():
"""Get list of shops with active blocking"""
active = []
shops = get_available_shops()
for shop in shops:
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
backup_file = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
if os.path.isfile(blocking_file) or os.path.isfile(backup_file):
active.append(shop)
return active
def activate_blocking(shop, silent=False):
"""Activate GeoIP blocking for a single shop"""
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
if os.path.isfile(backup_php):
if not silent:
print(f"⚠️ GeoIP-Blocking bereits aktiv für {shop}")
return False
if not os.path.isfile(index_php):
if not silent:
print(f"❌ index.php nicht gefunden")
return False
if not silent:
print(f"\n🔧 Aktiviere DACH GeoIP-Blocking für: {shop}")
print(" (Erlaubt: Deutschland, Österreich, Schweiz)")
print("=" * 60)
# Step 1: Install watcher service if not exists
active_shops = get_active_shops()
if not active_shops: # First shop
if not silent:
print("\n[1/3] Installiere CrowdSec-Watcher-Service...")
if check_crowdsec():
install_watcher_service()
else:
if not silent:
print(" ⚠️ CrowdSec nicht verfügbar - nur PHP-Blocking")
else:
if not silent:
print("\n[1/3] CrowdSec-Watcher-Service bereits aktiv")
# Step 2: PHP blocking
if not silent:
print("\n[2/3] Aktiviere PHP-Blocking...")
print(" 📋 Backup erstellen...")
shutil.copy2(index_php, backup_php)
with open(index_php, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
insert_line = 0
for i, line in enumerate(lines):
if 'declare(strict_types' in line:
insert_line = i + 1
break
elif '<?php' in line and insert_line == 0:
insert_line = i + 1
require_statement = f"require_once __DIR__ . '/{BLOCKING_FILE}';"
if require_statement not in content:
lines.insert(insert_line, require_statement)
with open(index_php, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
if not silent:
print(" ✏️ index.php modifiziert")
expiry = datetime.now() + timedelta(hours=72)
geoip_content = GEOIP_SCRIPT.format(
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop
)
with open(blocking_file, 'w', encoding='utf-8') as f:
f.write(geoip_content)
if not silent:
print(" 📝 geoip_blocking.php erstellt")
# Step 3: Register shop
if not silent:
print("\n[3/3] Registriere Shop...")
add_shop_to_active(shop)
if not silent:
print(" ✅ Shop registriert")
if not silent:
print("\n" + "=" * 60)
print(f"✅ DACH GeoIP-Blocking aktiviert für: {shop}")
print(f" Erlaubte Länder: 🇩🇪 DE | 🇦🇹 AT | 🇨🇭 CH")
print(f" Gültig bis: {expiry.strftime('%Y-%m-%d %H:%M:%S CET')}")
print(f" PHP-Log: {os.path.join(httpdocs, LOG_FILE)}")
print(f" CrowdSec-Queue: {os.path.join(httpdocs, CROWDSEC_QUEUE_FILE)}")
print(f"\n 🔄 Der Watcher-Service synchronisiert blockierte IPs zu CrowdSec")
print("=" * 60)
return True
def deactivate_blocking(shop, silent=False):
"""Deactivate GeoIP blocking for a single shop"""
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
cache_file = os.path.join(httpdocs, CACHE_FILE)
log_file = os.path.join(httpdocs, LOG_FILE)
queue_file = os.path.join(httpdocs, CROWDSEC_QUEUE_FILE)
if not silent:
print(f"\n🔧 Deaktiviere DACH GeoIP-Blocking für: {shop}")
print("=" * 60)
# Step 1: Remove PHP blocking
if not silent:
print("\n[1/4] PHP-Blocking entfernen...")
if os.path.isfile(backup_php):
shutil.move(backup_php, index_php)
if not silent:
print(" 📋 index.php wiederhergestellt")
else:
if os.path.isfile(index_php):
with open(index_php, 'r') as f:
content = f.read()
lines = [line for line in content.split('\n') if BLOCKING_FILE not in line]
with open(index_php, 'w') as f:
f.write('\n'.join(lines))
for f in [blocking_file, cache_file, log_file, queue_file]:
if os.path.isfile(f):
os.remove(f)
if not silent:
print(" 🗑️ PHP-Dateien gelöscht")
# Step 2: Remove from tracking
if not silent:
print("\n[2/4] Deregistriere Shop...")
remove_shop_from_active(shop)
if not silent:
print(" ✅ Shop deregistriert")
# Step 3: Clean CrowdSec decisions
if not silent:
print("\n[3/4] CrowdSec-Decisions entfernen...")
if check_crowdsec():
cleanup_crowdsec_decisions(shop)
# Step 4: Uninstall service if last shop
if not silent:
print("\n[4/4] Prüfe Watcher-Service...")
remaining_shops = [s for s in get_active_shops() if s != shop]
if not remaining_shops:
if not silent:
print(" Keine aktiven Shops mehr - deinstalliere Service")
uninstall_watcher_service()
else:
if not silent:
print(f" Service bleibt aktiv ({len(remaining_shops)} Shop(s) noch aktiv)")
if not silent:
print("\n" + "=" * 60)
print(f"✅ DACH GeoIP-Blocking deaktiviert für: {shop}")
print("=" * 60)
return True
def activate_all_shops():
"""Activate GeoIP blocking for all available shops"""
shops = get_available_shops()
active_shops = get_active_shops()
available_shops = [s for s in shops if s not in active_shops]
if not available_shops:
print("\n⚠️ Keine Shops zum Aktivieren verfügbar")
print(" Alle Shops haben bereits aktives GeoIP-Blocking")
return
print(f"\n{'=' * 60}")
print(f" DACH GeoIP-Blocking für ALLE Shops aktivieren")
print(f"{'=' * 60}")
print(f"\n📋 Folgende {len(available_shops)} Shop(s) werden aktiviert:\n")
for shop in available_shops:
print(f"{shop}")
print(f"\n⚠️ Dies aktiviert den Schutz für alle oben genannten Shops!")
confirm = input(f"\nFortfahren? (ja/nein): ").strip().lower()
if confirm not in ['ja', 'j', 'yes', 'y']:
print("\n❌ Abgebrochen")
return
print(f"\n{'=' * 60}")
print(" Starte Aktivierung...")
print(f"{'=' * 60}")
success_count = 0
failed_count = 0
failed_shops = []
# Install watcher service first if needed
if not active_shops and check_crowdsec():
print("\n📦 Installiere CrowdSec-Watcher-Service...")
install_watcher_service()
for i, shop in enumerate(available_shops, 1):
print(f"\n[{i}/{len(available_shops)}] Aktiviere: {shop}")
try:
# Use silent mode but show progress
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
if os.path.isfile(backup_php):
print(f" ⚠️ Bereits aktiv - überspringe")
continue
if not os.path.isfile(index_php):
print(f" ❌ index.php nicht gefunden")
failed_count += 1
failed_shops.append(shop)
continue
# Create backup
shutil.copy2(index_php, backup_php)
# Modify index.php
with open(index_php, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
insert_line = 0
for idx, line in enumerate(lines):
if 'declare(strict_types' in line:
insert_line = idx + 1
break
elif '<?php' in line and insert_line == 0:
insert_line = idx + 1
require_statement = f"require_once __DIR__ . '/{BLOCKING_FILE}';"
if require_statement not in content:
lines.insert(insert_line, require_statement)
with open(index_php, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
# Create blocking script
expiry = datetime.now() + timedelta(hours=72)
geoip_content = GEOIP_SCRIPT.format(
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop
)
with open(blocking_file, 'w', encoding='utf-8') as f:
f.write(geoip_content)
# Register shop
add_shop_to_active(shop)
print(f" ✅ Aktiviert (bis {expiry.strftime('%Y-%m-%d %H:%M')})")
success_count += 1
except Exception as e:
print(f" ❌ Fehler: {e}")
failed_count += 1
failed_shops.append(shop)
# Summary
print(f"\n{'=' * 60}")
print(" ZUSAMMENFASSUNG")
print(f"{'=' * 60}")
print(f"\n ✅ Erfolgreich aktiviert: {success_count}")
print(f" ❌ Fehlgeschlagen: {failed_count}")
if failed_shops:
print(f"\n Fehlgeschlagene Shops:")
for shop in failed_shops:
print(f"{shop}")
print(f"\n 🇩🇪 🇦🇹 🇨🇭 Nur DACH-Traffic erlaubt")
print(f" ⏰ Gültig für 72 Stunden")
print(f"{'=' * 60}")
def deactivate_all_shops():
"""Deactivate GeoIP blocking for all active shops"""
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine Shops mit aktivem GeoIP-Blocking gefunden")
return
print(f"\n{'=' * 60}")
print(f" DACH GeoIP-Blocking für ALLE Shops deaktivieren")
print(f"{'=' * 60}")
print(f"\n📋 Folgende {len(active_shops)} Shop(s) werden deaktiviert:\n")
for shop in active_shops:
print(f"{shop}")
print(f"\n⚠️ Dies deaktiviert den Schutz für alle oben genannten Shops!")
print(f"⚠️ Alle zugehörigen CrowdSec-Decisions werden ebenfalls entfernt!")
confirm = input(f"\nFortfahren? (ja/nein): ").strip().lower()
if confirm not in ['ja', 'j', 'yes', 'y']:
print("\n❌ Abgebrochen")
return
print(f"\n{'=' * 60}")
print(" Starte Deaktivierung...")
print(f"{'=' * 60}")
success_count = 0
failed_count = 0
failed_shops = []
for i, shop in enumerate(active_shops, 1):
print(f"\n[{i}/{len(active_shops)}] Deaktiviere: {shop}")
try:
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
cache_file = os.path.join(httpdocs, CACHE_FILE)
log_file_path = os.path.join(httpdocs, LOG_FILE)
queue_file = os.path.join(httpdocs, CROWDSEC_QUEUE_FILE)
# Restore backup
if os.path.isfile(backup_php):
shutil.move(backup_php, index_php)
print(f" 📋 index.php wiederhergestellt")
else:
if os.path.isfile(index_php):
with open(index_php, 'r') as f:
content = f.read()
lines = [line for line in content.split('\n') if BLOCKING_FILE not in line]
with open(index_php, 'w') as f:
f.write('\n'.join(lines))
# Remove files
for f in [blocking_file, cache_file, log_file_path, queue_file]:
if os.path.isfile(f):
os.remove(f)
# Remove from tracking
remove_shop_from_active(shop)
# Clean CrowdSec decisions
if check_crowdsec():
cleanup_crowdsec_decisions(shop)
print(f" ✅ Deaktiviert")
success_count += 1
except Exception as e:
print(f" ❌ Fehler: {e}")
failed_count += 1
failed_shops.append(shop)
# Uninstall watcher service
print("\n📦 Deinstalliere CrowdSec-Watcher-Service...")
uninstall_watcher_service()
# Summary
print(f"\n{'=' * 60}")
print(" ZUSAMMENFASSUNG")
print(f"{'=' * 60}")
print(f"\n ✅ Erfolgreich deaktiviert: {success_count}")
print(f" ❌ Fehlgeschlagen: {failed_count}")
if failed_shops:
print(f"\n Fehlgeschlagene Shops:")
for shop in failed_shops:
print(f"{shop}")
print(f"\n 🌍 Alle IPs sind nun wieder erlaubt")
print(f"{'=' * 60}")
def get_shop_log_stats(shop):
"""Get log statistics for a single shop"""
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
log_file = os.path.join(httpdocs, LOG_FILE)
php_blocks = 0
ips = {}
if os.path.isfile(log_file):
with open(log_file, 'r') as f:
for line in f:
php_blocks += 1
# Extract IP from log line
if 'IP: ' in line:
try:
ip = line.split('IP: ')[1].split(' |')[0].strip()
ips[ip] = ips.get(ip, 0) + 1
except:
pass
return php_blocks, ips
def get_crowdsec_stats_by_shop():
"""Get CrowdSec decision counts grouped by shop"""
if not check_crowdsec():
return {}
stats = {}
code, stdout, _ = run_command("cscli decisions list -o raw")
if code == 0 and stdout:
lines = stdout.strip().split('\n')
for line in lines[1:]: # Skip header
# Find shop name in reason field
for shop in get_active_shops():
if shop in line:
stats[shop] = stats.get(shop, 0) + 1
break
return stats
def show_all_logs():
"""Show combined logs for all active shops"""
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine aktiven Shops")
return
print(f"\n{'' * 60}")
print(" 📊 GESAMTÜBERSICHT ALLER SHOPS")
print(f"{'' * 60}")
total_php_blocks = 0
shop_php_stats = {}
all_ips = {}
# Collect PHP stats
for shop in active_shops:
blocks, ips = get_shop_log_stats(shop)
total_php_blocks += blocks
shop_php_stats[shop] = blocks
for ip, count in ips.items():
all_ips[ip] = all_ips.get(ip, 0) + count
# Get CrowdSec stats
crowdsec_stats = get_crowdsec_stats_by_shop()
total_crowdsec = sum(crowdsec_stats.values())
# Display PHP blocks
print(f"\n📝 PHP-Blocks gesamt: {total_php_blocks}")
if shop_php_stats:
for shop in sorted(shop_php_stats.keys()):
count = shop_php_stats[shop]
bar = "" * min(count // 10, 20) if count > 0 else ""
print(f" ├─ {shop}: {count} {bar}")
# Display CrowdSec bans
print(f"\n🛡️ CrowdSec-Bans gesamt: {total_crowdsec}")
if crowdsec_stats:
for shop in sorted(crowdsec_stats.keys()):
count = crowdsec_stats[shop]
bar = "" * min(count // 10, 20) if count > 0 else ""
print(f" ├─ {shop}: {count} {bar}")
elif check_crowdsec():
print(" └─ Keine aktiven Bans")
else:
print(" └─ CrowdSec nicht verfügbar")
# Top blocked IPs
if all_ips:
print(f"\n🔥 Top 10 blockierte IPs (alle Shops):")
sorted_ips = sorted(all_ips.items(), key=lambda x: x[1], reverse=True)[:10]
for ip, count in sorted_ips:
bar = "" * min(count // 5, 20) if count > 0 else ""
print(f" {ip}: {count} {bar}")
print(f"\n{'' * 60}")
# Wait for user
input("\nDrücke Enter um fortzufahren...")
def show_logs(shop):
"""Show logs for a single shop"""
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
log_file = os.path.join(httpdocs, LOG_FILE)
if os.path.isfile(log_file):
print(f"\n📊 PHP-Blocks für {shop}:")
print("=" * 80)
with open(log_file, 'r') as f:
lines = f.readlines()
for line in lines[-50:]:
print(line.rstrip())
print("=" * 80)
print(f"Gesamt: {len(lines)}")
else:
print(f" Keine Logs für {shop}")
if check_crowdsec():
print(f"\n📊 CrowdSec Decisions für {shop}:")
print("=" * 80)
# Use raw output (CSV format)
# Format: id,source,ip,reason,action,country,as,events_count,expiration,simulated,alert_id
code, stdout, _ = run_command("cscli decisions list -o raw")
if code == 0 and stdout:
lines = stdout.strip().split('\n')
shop_decisions = []
for line in lines[1:]: # Skip header
if shop in line:
shop_decisions.append(line)
if shop_decisions:
print(f"Aktive Bans: {len(shop_decisions)}")
print("\nLetzte 20 Bans:")
for line in shop_decisions[:20]:
parts = line.split(',')
if len(parts) > 8:
# Column 2: ip (format "Ip:1.2.3.4")
ip_field = parts[2].strip()
if ':' in ip_field:
ip = ip_field.split(':', 1)[1]
else:
ip = ip_field
# Column 8: expiration
expiry = parts[8].strip()
print(f" 🚫 {ip} (bis {expiry})")
if len(shop_decisions) > 20:
print(f" ... und {len(shop_decisions) - 20} weitere")
else:
print("Keine aktiven CrowdSec-Bans für diesen Shop")
else:
print("Konnte Decisions nicht abrufen")
print("=" * 80)
def main():
"""Main menu"""
print("\n" + "=" * 60)
print(" GeoIP Shop Blocker Manager - DACH Version")
print(" Erlaubt: 🇩🇪 Deutschland | 🇦🇹 Österreich | 🇨🇭 Schweiz")
print(" PHP + CrowdSec Watcher (systemd service)")
print("=" * 60)
if check_crowdsec():
print(" ✅ CrowdSec: Aktiv")
else:
print(" ⚠️ CrowdSec: Nicht verfügbar")
# Check service status
code, stdout, _ = run_command("systemctl is-active geoip-crowdsec-watcher.service")
if code == 0 and stdout.strip() == "active":
print(" ✅ Watcher-Service: Läuft")
else:
print(" ⚠️ Watcher-Service: Nicht aktiv")
while True:
print("\n" + "-" * 40)
print("[1] GeoIP-Blocking AKTIVIEREN (einzeln)")
print("[2] GeoIP-Blocking DEAKTIVIEREN (einzeln)")
print("[3] Logs anzeigen")
print("[4] Status anzeigen")
print("-" * 40)
print("[5] 🚀 ALLE Shops aktivieren")
print("[6] 🛑 ALLE Shops deaktivieren")
print("-" * 40)
print("[0] Beenden")
choice = input("\nWähle eine Option: ").strip()
if choice == "1":
shops = get_available_shops()
active_shops = get_active_shops()
available_shops = [s for s in shops if s not in active_shops]
if not available_shops:
print("\n⚠️ Keine Shops verfügbar")
continue
print("\n📋 Verfügbare Shops:")
for i, shop in enumerate(available_shops, 1):
print(f" [{i}] {shop}")
shop_choice = input("\nWähle einen Shop: ").strip()
try:
shop_idx = int(shop_choice) - 1
if 0 <= shop_idx < len(available_shops):
selected_shop = available_shops[shop_idx]
confirm = input(f"\n⚠️ DACH-Blocking aktivieren für '{selected_shop}'? (ja/nein): ").strip().lower()
if confirm in ['ja', 'j', 'yes', 'y']:
activate_blocking(selected_shop)
else:
print("❌ Ungültig")
except ValueError:
print("❌ Ungültig")
elif choice == "2":
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine aktiven Shops")
continue
print("\n📋 Aktive Shops:")
for i, shop in enumerate(active_shops, 1):
print(f" [{i}] {shop}")
shop_choice = input("\nWähle einen Shop: ").strip()
try:
shop_idx = int(shop_choice) - 1
if 0 <= shop_idx < len(active_shops):
selected_shop = active_shops[shop_idx]
confirm = input(f"\n⚠️ Deaktivieren für '{selected_shop}'? (ja/nein): ").strip().lower()
if confirm in ['ja', 'j', 'yes', 'y']:
deactivate_blocking(selected_shop)
else:
print("❌ Ungültig")
except ValueError:
print("❌ Ungültig")
elif choice == "3":
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine aktiven Shops")
continue
print("\n📋 Logs anzeigen für:")
print(f" [0] 📊 ALLE Shops (Zusammenfassung)")
for i, shop in enumerate(active_shops, 1):
print(f" [{i}] {shop}")
shop_choice = input("\nWähle eine Option: ").strip()
try:
shop_idx = int(shop_choice)
if shop_idx == 0:
show_all_logs()
elif 1 <= shop_idx <= len(active_shops):
show_logs(active_shops[shop_idx - 1])
else:
print("❌ Ungültig")
except ValueError:
print("❌ Ungültig")
elif choice == "4":
shops = get_available_shops()
active_shops = get_active_shops()
print(f"\n📊 Status:")
print(f" Shops gesamt: {len(shops)}")
print(f" Aktive DACH-Blockings: {len(active_shops)}")
if active_shops:
for shop in active_shops:
print(f"{shop}")
elif choice == "5":
activate_all_shops()
elif choice == "6":
deactivate_all_shops()
elif choice == "0":
print("\n👋 Auf Wiedersehen!")
break
if __name__ == "__main__":
if os.geteuid() != 0:
print("❌ Als root ausführen!")
sys.exit(1)
try:
main()
except KeyboardInterrupt:
print("\n\n👋 Abgebrochen")
sys.exit(0)