Files
geoip_shop_manager/geoip_shop_manager.py

1644 lines
54 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
GeoIP Shop Blocker Manager - DACH Version
2-Component System: PHP blocking + Python watcher (systemd service)
Blocks all IPs outside Germany, Austria, and Switzerland (DACH region)
"""
import os
import sys
import shutil
import subprocess
import json
import time
import re
from datetime import datetime, timedelta
from pathlib import Path
# Configuration
VHOSTS_DIR = "/var/www/vhosts"
BACKUP_SUFFIX = ".geoip_backup"
BLOCKING_FILE = "geoip_blocking.php"
CACHE_FILE = "dach_ip_ranges.cache"
LOG_FILE = "geoip_blocked.log"
CROWDSEC_QUEUE_FILE = "geoip_crowdsec_queue.log"
WATCHER_SCRIPT = "/usr/local/bin/geoip_crowdsec_watcher.py"
SYSTEMD_SERVICE = "/etc/systemd/system/geoip-crowdsec-watcher.service"
ACTIVE_SHOPS_FILE = "/var/lib/crowdsec/geoip_active_shops.json"
# =============================================================================
# BOT DETECTION - Comprehensive list of known bots/crawlers
# =============================================================================
BOT_PATTERNS = {
# OpenAI
'GPTBot': r'GPTBot',
'OAI-SearchBot': r'OAI-SearchBot',
'ChatGPT-User': r'ChatGPT-User',
# Anthropic (Claude)
'ClaudeBot': r'ClaudeBot',
'Claude-User': r'Claude-User',
'Claude-SearchBot': r'Claude-SearchBot',
'anthropic-ai': r'anthropic-ai',
'claude-web': r'claude-web',
# Google
'Googlebot': r'Googlebot',
'Google-Extended': r'Google-Extended',
'Googlebot-Image': r'Googlebot-Image',
'Googlebot-Video': r'Googlebot-Video',
'Googlebot-News': r'Googlebot-News',
'Gemini-Deep-Research': r'Gemini-Deep-Research',
'Google-CloudVertexBot': r'Google-CloudVertexBot',
'AdsBot-Google': r'AdsBot-Google',
'Mediapartners-Google': r'Mediapartners-Google',
'FeedFetcher-Google': r'FeedFetcher-Google',
'Google-InspectionTool': r'Google-InspectionTool',
# Microsoft/Bing
'Bingbot': r'[Bb]ingbot',
'BingPreview': r'BingPreview',
'msnbot': r'msnbot',
'AdIdxBot': r'AdIdxBot',
# Perplexity
'PerplexityBot': r'PerplexityBot',
'Perplexity-User': r'Perplexity-User',
# Apple
'Applebot': r'Applebot',
'Applebot-Extended': r'Applebot-Extended',
# Amazon
'Amazonbot': r'Amazonbot',
# Meta/Facebook
'FacebookBot': r'facebookexternalhit|FacebookBot',
'meta-externalagent': r'meta-externalagent',
'Meta-WebIndexer': r'Meta-WebIndexer',
# ByteDance/TikTok
'Bytespider': r'Bytespider',
# DuckDuckGo
'DuckDuckBot': r'DuckDuckBot',
'DuckAssistBot': r'DuckAssistBot',
# Other AI/LLM
'cohere-ai': r'cohere-ai',
'YouBot': r'YouBot',
'MistralAI-User': r'MistralAI-User',
'AI2Bot': r'AI2Bot',
'CCBot': r'CCBot',
'Diffbot': r'Diffbot',
'Timpibot': r'Timpibot',
'omgili': r'omgili',
'webzio': r'webzio',
'ICC-Crawler': r'ICC-Crawler',
# SEO Tools
'AhrefsBot': r'AhrefsBot',
'SemrushBot': r'SemrushBot',
'MJ12bot': r'MJ12bot',
'DotBot': r'DotBot',
'BLEXBot': r'BLEXBot',
'DataForSeoBot': r'DataForSeoBot',
'SEOkicks': r'SEOkicks',
'seoscanners': r'seoscanners',
'Screaming Frog': r'Screaming Frog',
'Sistrix': r'Sistrix',
'JEEC2Bot': r'JEEC2Bot',
# Other Search Engines
'YandexBot': r'YandexBot',
'YandexImages': r'YandexImages',
'Baiduspider': r'Baiduspider',
'PetalBot': r'PetalBot',
'Sogou': r'Sogou',
'Qwantify': r'Qwantify',
'ia_archiver': r'ia_archiver',
# Social Media
'LinkedInBot': r'LinkedInBot',
'Twitterbot': r'Twitterbot',
'Pinterest': r'Pinterest',
'Slackbot': r'Slackbot',
'TelegramBot': r'TelegramBot',
'WhatsApp': r'WhatsApp',
'Discordbot': r'Discordbot',
# Monitoring & Security
'UptimeRobot': r'UptimeRobot',
'Pingdom': r'Pingdom',
'StatusCake': r'StatusCake',
'GTmetrix': r'GTmetrix',
'Site24x7': r'Site24x7',
# Payment/E-Commerce
'PayPal IPN': r'PayPal',
'Stripe': r'Stripe',
'Shopify': r'Shopify',
# Feed Readers
'Feedly': r'Feedly',
'NewsBlur': r'NewsBlur',
# Other known bots
'SeznamBot': r'SeznamBot',
'Exabot': r'Exabot',
'archive.org_bot': r'archive\.org_bot',
'Wget': r'Wget',
'curl': r'^curl/',
'python-requests': r'python-requests',
'Go-http-client': r'Go-http-client',
'Java': r'^Java/',
'Apache-HttpClient': r'Apache-HttpClient',
'okhttp': r'okhttp',
'HeadlessChrome': r'HeadlessChrome',
'PhantomJS': r'PhantomJS',
'Scrapy': r'Scrapy',
}
def detect_bot(user_agent):
"""Detect bot name from user agent string"""
if not user_agent or user_agent == 'Unknown':
return 'Unbekannt'
for bot_name, pattern in BOT_PATTERNS.items():
if re.search(pattern, user_agent, re.IGNORECASE):
return bot_name
return 'Unbekannt'
# PHP GeoIP blocking script (no exec, just logging)
GEOIP_SCRIPT = '''<?php
/**
* GeoIP Blocking Script - Blocks all non-DACH IPs
* DACH = Germany (DE), Austria (AT), Switzerland (CH)
* Logs blocked IPs for CrowdSec watcher to process
* Valid until: {expiry_date}
*/
// Auto-disable after 72 hours
$expiry_date = strtotime('{expiry_timestamp}');
if (time() > $expiry_date) {{
return; // Script expired, allow all traffic
}}
// Get visitor IP
$visitor_ip = $_SERVER['REMOTE_ADDR'] ?? '';
if (empty($visitor_ip)) {{
return;
}}
// Skip private IPs
if (filter_var($visitor_ip, FILTER_VALIDATE_IP, FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE) === false) {{
return;
}}
// Files
$cache_file = __DIR__ . '/{cache_file}';
$cache_duration = 86400; // 24 hours
$log_file = __DIR__ . '/{log_file}';
$crowdsec_queue = __DIR__ . '/{crowdsec_queue}';
// Function to download DACH IP ranges (Germany, Austria, Switzerland)
function download_dach_ranges() {{
$ranges = [];
$countries = ['de', 'at', 'ch']; // Germany, Austria, Switzerland
foreach ($countries as $country) {{
$url = "https://www.ipdeny.com/ipblocks/data/aggregated/$country-aggregated.zone";
$content = @file_get_contents($url);
if ($content !== false) {{
$lines = explode("\\n", trim($content));
foreach ($lines as $line) {{
$line = trim($line);
if (!empty($line) && strpos($line, '/') !== false) {{
$ranges[] = $line;
}}
}}
}}
}}
return $ranges;
}}
// Function to check if IP is in CIDR range
function ip_in_range($ip, $cidr) {{
list($subnet, $mask) = explode('/', $cidr);
$ip_long = ip2long($ip);
$subnet_long = ip2long($subnet);
$mask_long = -1 << (32 - (int)$mask);
return ($ip_long & $mask_long) == ($subnet_long & $mask_long);
}}
// Load or download IP ranges
$dach_ranges = [];
if (file_exists($cache_file) && (time() - filemtime($cache_file)) < $cache_duration) {{
$dach_ranges = unserialize(file_get_contents($cache_file));
}} else {{
$dach_ranges = download_dach_ranges();
if (!empty($dach_ranges)) {{
@file_put_contents($cache_file, serialize($dach_ranges));
}}
}}
// Check if visitor IP is from DACH region
$is_dach = false;
foreach ($dach_ranges as $range) {{
if (ip_in_range($visitor_ip, $range)) {{
$is_dach = true;
break;
}}
}}
// Block non-DACH IPs
if (!$is_dach) {{
$timestamp = date('Y-m-d H:i:s');
$user_agent = $_SERVER['HTTP_USER_AGENT'] ?? 'Unknown';
$request_uri = $_SERVER['REQUEST_URI'] ?? '/';
// Log for humans
$log_entry = "[$timestamp] IP: $visitor_ip | UA: $user_agent | URI: $request_uri\\n";
@file_put_contents($log_file, $log_entry, FILE_APPEND | LOCK_EX);
// Queue for CrowdSec (simple format)
$queue_entry = "$timestamp|$visitor_ip|{shop_name}\\n";
@file_put_contents($crowdsec_queue, $queue_entry, FILE_APPEND | LOCK_EX);
header('HTTP/1.1 403 Forbidden');
exit;
}}
'''
# PHP GeoIP blocking script - PHP ONLY (no CrowdSec queue)
GEOIP_SCRIPT_PHP_ONLY = '''<?php
/**
* GeoIP Blocking Script - Blocks all non-DACH IPs (PHP-only mode)
* DACH = Germany (DE), Austria (AT), Switzerland (CH)
* Logs blocked IPs (no CrowdSec synchronisation)
* Valid until: {expiry_date}
*/
// Auto-disable after 72 hours
$expiry_date = strtotime('{expiry_timestamp}');
if (time() > $expiry_date) {{
return; // Script expired, allow all traffic
}}
// Get visitor IP
$visitor_ip = $_SERVER['REMOTE_ADDR'] ?? '';
if (empty($visitor_ip)) {{
return;
}}
// Skip private IPs
if (filter_var($visitor_ip, FILTER_VALIDATE_IP, FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE) === false) {{
return;
}}
// Files
$cache_file = __DIR__ . '/{cache_file}';
$cache_duration = 86400; // 24 hours
$log_file = __DIR__ . '/{log_file}';
// Function to download DACH IP ranges (Germany, Austria, Switzerland)
function download_dach_ranges() {{
$ranges = [];
$countries = ['de', 'at', 'ch']; // Germany, Austria, Switzerland
foreach ($countries as $country) {{
$url = "https://www.ipdeny.com/ipblocks/data/aggregated/$country-aggregated.zone";
$content = @file_get_contents($url);
if ($content !== false) {{
$lines = explode("\\n", trim($content));
foreach ($lines as $line) {{
$line = trim($line);
if (!empty($line) && strpos($line, '/') !== false) {{
$ranges[] = $line;
}}
}}
}}
}}
return $ranges;
}}
// Function to check if IP is in CIDR range
function ip_in_range($ip, $cidr) {{
list($subnet, $mask) = explode('/', $cidr);
$ip_long = ip2long($ip);
$subnet_long = ip2long($subnet);
$mask_long = -1 << (32 - (int)$mask);
return ($ip_long & $mask_long) == ($subnet_long & $mask_long);
}}
// Load or download IP ranges
$dach_ranges = [];
if (file_exists($cache_file) && (time() - filemtime($cache_file)) < $cache_duration) {{
$dach_ranges = unserialize(file_get_contents($cache_file));
}} else {{
$dach_ranges = download_dach_ranges();
if (!empty($dach_ranges)) {{
@file_put_contents($cache_file, serialize($dach_ranges));
}}
}}
// Check if visitor IP is from DACH region
$is_dach = false;
foreach ($dach_ranges as $range) {{
if (ip_in_range($visitor_ip, $range)) {{
$is_dach = true;
break;
}}
}}
// Block non-DACH IPs
if (!$is_dach) {{
$timestamp = date('Y-m-d H:i:s');
$user_agent = $_SERVER['HTTP_USER_AGENT'] ?? 'Unknown';
$request_uri = $_SERVER['REQUEST_URI'] ?? '/';
// Log for humans
$log_entry = "[$timestamp] IP: $visitor_ip | UA: $user_agent | URI: $request_uri\\n";
@file_put_contents($log_file, $log_entry, FILE_APPEND | LOCK_EX);
header('HTTP/1.1 403 Forbidden');
exit;
}}
'''
# Python watcher script (runs as systemd service)
WATCHER_SCRIPT_CONTENT = '''#!/usr/bin/env python3
"""
GeoIP CrowdSec Watcher Service
Monitors queue files and adds blocked IPs to CrowdSec
"""
import os
import sys
import time
import subprocess
import json
from pathlib import Path
from datetime import datetime
VHOSTS_DIR = "/var/www/vhosts"
QUEUE_FILE = "geoip_crowdsec_queue.log"
ACTIVE_SHOPS_FILE = "/var/lib/crowdsec/geoip_active_shops.json"
PROCESSED_IPS = {} # In-memory cache to avoid re-adding same IP
CHECK_INTERVAL = 5 # Check every 5 seconds
def log(msg):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}", flush=True)
def get_active_shops():
"""Get list of shops with active GeoIP blocking"""
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return {}
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
return json.load(f)
except:
return {}
def add_to_crowdsec(ip, shop):
"""Add IP to CrowdSec with 72h ban"""
# Check if already processed recently (within last hour)
now = time.time()
if ip in PROCESSED_IPS and (now - PROCESSED_IPS[ip]) < 3600:
return True
cmd = [
'cscli', 'decisions', 'add',
'--ip', ip,
'--duration', '72h',
'--type', 'ban',
'--reason', f'GeoIP: Non-DACH IP blocked by {shop}'
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
PROCESSED_IPS[ip] = now
log(f"✅ Added {ip} to CrowdSec (from {shop})")
return True
else:
log(f"⚠️ Failed to add {ip}: {result.stderr.strip()}")
return False
except Exception as e:
log(f"❌ Error adding {ip}: {e}")
return False
def process_queue_file(shop_path, shop):
"""Process queue file for a shop"""
queue_file = os.path.join(shop_path, 'httpdocs', QUEUE_FILE)
if not os.path.isfile(queue_file):
return 0
processed = 0
try:
# Read all lines
with open(queue_file, 'r') as f:
lines = f.readlines()
if not lines:
return 0
# Process each line
for line in lines:
line = line.strip()
if not line:
continue
try:
parts = line.split('|')
if len(parts) >= 2:
timestamp = parts[0]
ip = parts[1]
if add_to_crowdsec(ip, shop):
processed += 1
except:
continue
# Clear the file after processing
if processed > 0:
with open(queue_file, 'w') as f:
f.write('')
except Exception as e:
log(f"❌ Error processing {shop}: {e}")
return processed
def main():
log("🚀 GeoIP CrowdSec Watcher started (DACH mode)")
while True:
try:
active_shops = get_active_shops()
if not active_shops:
time.sleep(CHECK_INTERVAL)
continue
total_processed = 0
for shop, info in active_shops.items():
shop_path = os.path.join(VHOSTS_DIR, shop)
if os.path.isdir(shop_path):
count = process_queue_file(shop_path, shop)
total_processed += count
if total_processed > 0:
log(f"📊 Processed {total_processed} IPs in this cycle")
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
log("👋 Shutting down...")
break
except Exception as e:
log(f"❌ Error in main loop: {e}")
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main()
'''
# Systemd service file
SYSTEMD_SERVICE_CONTENT = '''[Unit]
Description=GeoIP CrowdSec Watcher Service (DACH)
After=network.target crowdsec.service
Wants=crowdsec.service
[Service]
Type=simple
ExecStart=/usr/bin/python3 /usr/local/bin/geoip_crowdsec_watcher.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
'''
def run_command(cmd, capture_output=True):
"""Run a shell command"""
try:
if capture_output:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
return result.returncode, result.stdout, result.stderr
else:
result = subprocess.run(cmd, shell=True, timeout=30)
return result.returncode, "", ""
except Exception as e:
return -1, "", str(e)
def check_crowdsec():
"""Check if CrowdSec is running"""
code, stdout, stderr = run_command("systemctl is-active crowdsec")
return code == 0 and stdout.strip() == "active"
def install_watcher_service():
"""Install the watcher script and systemd service"""
print(" 📦 Installiere CrowdSec-Watcher-Service...")
# Create watcher script
with open(WATCHER_SCRIPT, 'w') as f:
f.write(WATCHER_SCRIPT_CONTENT)
os.chmod(WATCHER_SCRIPT, 0o755)
print(f" ✅ Watcher-Script erstellt: {WATCHER_SCRIPT}")
# Create systemd service
with open(SYSTEMD_SERVICE, 'w') as f:
f.write(SYSTEMD_SERVICE_CONTENT)
print(f" ✅ Systemd-Service erstellt: {SYSTEMD_SERVICE}")
# Reload systemd and start service
run_command("systemctl daemon-reload")
run_command("systemctl enable geoip-crowdsec-watcher.service")
run_command("systemctl start geoip-crowdsec-watcher.service")
# Check if started
time.sleep(1)
code, stdout, _ = run_command("systemctl is-active geoip-crowdsec-watcher.service")
if code == 0 and stdout.strip() == "active":
print(" ✅ Service gestartet und läuft")
return True
else:
print(" ⚠️ Service konnte nicht gestartet werden")
return False
def uninstall_watcher_service():
"""Uninstall the watcher script and systemd service"""
print(" 📦 Deinstalliere CrowdSec-Watcher-Service...")
# Stop and disable service
run_command("systemctl stop geoip-crowdsec-watcher.service")
run_command("systemctl disable geoip-crowdsec-watcher.service")
# Remove files
if os.path.isfile(SYSTEMD_SERVICE):
os.remove(SYSTEMD_SERVICE)
print(f" 🗑️ Service-Datei gelöscht")
if os.path.isfile(WATCHER_SCRIPT):
os.remove(WATCHER_SCRIPT)
print(f" 🗑️ Watcher-Script gelöscht")
run_command("systemctl daemon-reload")
print(" ✅ Service deinstalliert")
def add_shop_to_active(shop, mode="php+crowdsec"):
"""Add shop to active shops tracking with mode"""
os.makedirs(os.path.dirname(ACTIVE_SHOPS_FILE), exist_ok=True)
shops = {}
if os.path.isfile(ACTIVE_SHOPS_FILE):
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
shops[shop] = {
"activated": datetime.now().isoformat(),
"expiry": (datetime.now() + timedelta(hours=72)).isoformat(),
"mode": mode # "php+crowdsec" or "php-only"
}
with open(ACTIVE_SHOPS_FILE, 'w') as f:
json.dump(shops, f, indent=2)
def get_shop_mode(shop):
"""Get the blocking mode for a shop"""
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return "php+crowdsec"
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
return shops.get(shop, {}).get("mode", "php+crowdsec")
except:
return "php+crowdsec"
def get_shop_activation_time(shop):
"""Get the activation timestamp for a shop"""
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return None
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
activated_str = shops.get(shop, {}).get("activated")
if activated_str:
return datetime.fromisoformat(activated_str)
except:
pass
return None
def format_duration(minutes):
"""Format minutes as human readable duration"""
if minutes < 60:
return f"{int(minutes)}m"
hours = minutes / 60
if hours < 24:
return f"{int(hours)}h {int(minutes % 60)}m"
days = hours / 24
remaining_hours = hours % 24
return f"{int(days)}d {int(remaining_hours)}h"
def remove_shop_from_active(shop):
"""Remove shop from active shops tracking"""
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
if shop in shops:
del shops[shop]
with open(ACTIVE_SHOPS_FILE, 'w') as f:
json.dump(shops, f, indent=2)
def cleanup_crowdsec_decisions(shop):
"""Remove CrowdSec decisions for a shop"""
if not check_crowdsec():
return
print(f" 🔍 Entferne CrowdSec-Decisions für {shop}...")
total_removed = 0
max_iterations = 50 # Safety limit
iteration = 0
while iteration < max_iterations:
iteration += 1
# Get all decisions with --limit 0 (no pagination)
list_cmd = f"cscli decisions list -o raw --limit 0 | grep '{shop}'"
code, stdout, stderr = run_command(list_cmd)
if code != 0 or not stdout.strip():
break # No more decisions found
# Extract IPs and delete them (process in batches of 100)
lines = stdout.strip().split('\n')
batch_count = 0
for line in lines[:100]: # Process max 100 per iteration
try:
parts = line.split(',')
if len(parts) >= 3:
ip_field = parts[2].strip()
if ':' in ip_field:
ip = ip_field.split(':', 1)[1]
else:
ip = ip_field
if ip:
del_cmd = f"cscli decisions delete --ip {ip}"
del_code, _, _ = run_command(del_cmd)
if del_code == 0:
batch_count += 1
except:
continue
total_removed += batch_count
if batch_count == 0:
break # Nothing deleted in this iteration
# Small progress indicator for large cleanups
if iteration > 1:
print(f" ... {total_removed} Decisions entfernt (Durchlauf {iteration})")
if total_removed > 0:
print(f"{total_removed} Decisions entfernt")
else:
print(" Keine Decisions gefunden")
def get_available_shops():
"""Get list of all shops"""
shops = []
if not os.path.exists(VHOSTS_DIR):
return shops
for entry in os.listdir(VHOSTS_DIR):
shop_path = os.path.join(VHOSTS_DIR, entry)
if os.path.isdir(shop_path) and entry not in ['chroot', 'system', 'default']:
httpdocs = os.path.join(shop_path, 'httpdocs')
if os.path.isdir(httpdocs):
index_php = os.path.join(httpdocs, 'index.php')
if os.path.isfile(index_php):
shops.append(entry)
return sorted(shops)
def get_active_shops():
"""Get list of shops with active blocking"""
active = []
shops = get_available_shops()
for shop in shops:
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
backup_file = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
if os.path.isfile(blocking_file) or os.path.isfile(backup_file):
active.append(shop)
return active
def activate_blocking(shop, silent=False, mode="php+crowdsec"):
"""Activate GeoIP blocking for a single shop"""
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
if os.path.isfile(backup_php):
if not silent:
print(f"⚠️ GeoIP-Blocking bereits aktiv für {shop}")
return False
if not os.path.isfile(index_php):
if not silent:
print(f"❌ index.php nicht gefunden")
return False
if not silent:
print(f"\n🔧 Aktiviere DACH GeoIP-Blocking für: {shop}")
print(" (Erlaubt: Deutschland, Österreich, Schweiz)")
print(f" Modus: {'PHP + CrowdSec' if mode == 'php+crowdsec' else 'Nur PHP'}")
print("=" * 60)
# Step 1: Install watcher service if not exists (only for php+crowdsec mode)
if mode == "php+crowdsec":
active_shops = get_active_shops()
# Check if any shop uses crowdsec mode
crowdsec_shops = [s for s in active_shops if get_shop_mode(s) == "php+crowdsec"]
if not crowdsec_shops: # First shop with crowdsec
if not silent:
print("\n[1/3] Installiere CrowdSec-Watcher-Service...")
if check_crowdsec():
install_watcher_service()
else:
if not silent:
print(" ⚠️ CrowdSec nicht verfügbar - nur PHP-Blocking")
else:
if not silent:
print("\n[1/3] CrowdSec-Watcher-Service bereits aktiv")
else:
if not silent:
print("\n[1/3] CrowdSec-Synchronisation deaktiviert (nur PHP-Modus)")
# Step 2: PHP blocking
if not silent:
print("\n[2/3] Aktiviere PHP-Blocking...")
print(" 📋 Backup erstellen...")
shutil.copy2(index_php, backup_php)
with open(index_php, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
insert_line = 0
for i, line in enumerate(lines):
if 'declare(strict_types' in line:
insert_line = i + 1
break
elif '<?php' in line and insert_line == 0:
insert_line = i + 1
require_statement = f"require_once __DIR__ . '/{BLOCKING_FILE}';"
if require_statement not in content:
lines.insert(insert_line, require_statement)
with open(index_php, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
if not silent:
print(" ✏️ index.php modifiziert")
expiry = datetime.now() + timedelta(hours=72)
# Generate PHP script based on mode
if mode == "php+crowdsec":
geoip_content = GEOIP_SCRIPT.format(
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop
)
else:
# PHP-only mode: no crowdsec queue writing
geoip_content = GEOIP_SCRIPT_PHP_ONLY.format(
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
shop_name=shop
)
with open(blocking_file, 'w', encoding='utf-8') as f:
f.write(geoip_content)
if not silent:
print(" 📝 geoip_blocking.php erstellt")
# Step 3: Register shop
if not silent:
print("\n[3/3] Registriere Shop...")
add_shop_to_active(shop, mode)
if not silent:
print(" ✅ Shop registriert")
if not silent:
print("\n" + "=" * 60)
print(f"✅ DACH GeoIP-Blocking aktiviert für: {shop}")
print(f" Erlaubte Länder: 🇩🇪 DE | 🇦🇹 AT | 🇨🇭 CH")
print(f" Modus: {'PHP + CrowdSec 🛡️' if mode == 'php+crowdsec' else 'Nur PHP 📝'}")
print(f" Gültig bis: {expiry.strftime('%Y-%m-%d %H:%M:%S CET')}")
print(f" PHP-Log: {os.path.join(httpdocs, LOG_FILE)}")
if mode == "php+crowdsec":
print(f" CrowdSec-Queue: {os.path.join(httpdocs, CROWDSEC_QUEUE_FILE)}")
print(f"\n 🔄 Der Watcher-Service synchronisiert blockierte IPs zu CrowdSec")
print("=" * 60)
return True
def deactivate_blocking(shop, silent=False):
"""Deactivate GeoIP blocking for a single shop"""
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
cache_file = os.path.join(httpdocs, CACHE_FILE)
log_file = os.path.join(httpdocs, LOG_FILE)
queue_file = os.path.join(httpdocs, CROWDSEC_QUEUE_FILE)
# Get mode before removing from tracking
shop_mode = get_shop_mode(shop)
if not silent:
print(f"\n🔧 Deaktiviere DACH GeoIP-Blocking für: {shop}")
print(f" Modus war: {'PHP + CrowdSec' if shop_mode == 'php+crowdsec' else 'Nur PHP'}")
print("=" * 60)
# Step 1: Remove PHP blocking
if not silent:
print("\n[1/4] PHP-Blocking entfernen...")
if os.path.isfile(backup_php):
shutil.move(backup_php, index_php)
if not silent:
print(" 📋 index.php wiederhergestellt")
else:
if os.path.isfile(index_php):
with open(index_php, 'r') as f:
content = f.read()
lines = [line for line in content.split('\n') if BLOCKING_FILE not in line]
with open(index_php, 'w') as f:
f.write('\n'.join(lines))
for f in [blocking_file, cache_file, log_file, queue_file]:
if os.path.isfile(f):
os.remove(f)
if not silent:
print(" 🗑️ PHP-Dateien gelöscht")
# Step 2: Remove from tracking
if not silent:
print("\n[2/4] Deregistriere Shop...")
remove_shop_from_active(shop)
if not silent:
print(" ✅ Shop deregistriert")
# Step 3: Clean CrowdSec decisions (only if mode was php+crowdsec)
if not silent:
print("\n[3/4] CrowdSec-Decisions entfernen...")
if shop_mode == "php+crowdsec" and check_crowdsec():
cleanup_crowdsec_decisions(shop)
else:
if not silent:
print(" Keine CrowdSec-Synchronisation aktiv (PHP-only Modus)")
# Step 4: Uninstall service if no more crowdsec shops
if not silent:
print("\n[4/4] Prüfe Watcher-Service...")
remaining_shops = get_active_shops()
crowdsec_shops = [s for s in remaining_shops if get_shop_mode(s) == "php+crowdsec"]
if not crowdsec_shops:
if not silent:
print(" Keine Shops mit CrowdSec-Modus mehr - deinstalliere Service")
uninstall_watcher_service()
else:
if not silent:
print(f" Service bleibt aktiv ({len(crowdsec_shops)} Shop(s) mit CrowdSec-Modus)")
if not silent:
print("\n" + "=" * 60)
print(f"✅ DACH GeoIP-Blocking deaktiviert für: {shop}")
print("=" * 60)
return True
def activate_all_shops():
"""Activate GeoIP blocking for all available shops"""
shops = get_available_shops()
active_shops = get_active_shops()
available_shops = [s for s in shops if s not in active_shops]
if not available_shops:
print("\n⚠️ Keine Shops zum Aktivieren verfügbar")
print(" Alle Shops haben bereits aktives GeoIP-Blocking")
return
print(f"\n{'=' * 60}")
print(f" DACH GeoIP-Blocking für ALLE Shops aktivieren")
print(f"{'=' * 60}")
print(f"\n📋 Folgende {len(available_shops)} Shop(s) werden aktiviert:\n")
for shop in available_shops:
print(f"{shop}")
# Ask for mode
print(f"\n🔧 Wähle den Blocking-Modus:")
print(f" [1] PHP + CrowdSec (IPs werden an CrowdSec gemeldet)")
print(f" [2] Nur PHP (keine CrowdSec-Synchronisation)")
mode_choice = input(f"\nModus wählen [1/2]: ").strip()
if mode_choice == "2":
mode = "php-only"
mode_display = "Nur PHP 📝"
else:
mode = "php+crowdsec"
mode_display = "PHP + CrowdSec 🛡️"
print(f"\n⚠️ Dies aktiviert den Schutz für alle oben genannten Shops!")
print(f" Modus: {mode_display}")
confirm = input(f"\nFortfahren? (ja/nein): ").strip().lower()
if confirm not in ['ja', 'j', 'yes', 'y']:
print("\n❌ Abgebrochen")
return
print(f"\n{'=' * 60}")
print(f" Starte Aktivierung ({mode_display})...")
print(f"{'=' * 60}")
success_count = 0
failed_count = 0
failed_shops = []
# Install watcher service first if needed (only for php+crowdsec mode)
if mode == "php+crowdsec":
crowdsec_shops = [s for s in active_shops if get_shop_mode(s) == "php+crowdsec"]
if not crowdsec_shops and check_crowdsec():
print("\n📦 Installiere CrowdSec-Watcher-Service...")
install_watcher_service()
for i, shop in enumerate(available_shops, 1):
print(f"\n[{i}/{len(available_shops)}] Aktiviere: {shop}")
try:
# Use silent mode but show progress
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
if os.path.isfile(backup_php):
print(f" ⚠️ Bereits aktiv - überspringe")
continue
if not os.path.isfile(index_php):
print(f" ❌ index.php nicht gefunden")
failed_count += 1
failed_shops.append(shop)
continue
# Create backup
shutil.copy2(index_php, backup_php)
# Modify index.php
with open(index_php, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
insert_line = 0
for idx, line in enumerate(lines):
if 'declare(strict_types' in line:
insert_line = idx + 1
break
elif '<?php' in line and insert_line == 0:
insert_line = idx + 1
require_statement = f"require_once __DIR__ . '/{BLOCKING_FILE}';"
if require_statement not in content:
lines.insert(insert_line, require_statement)
with open(index_php, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
# Create blocking script based on mode
expiry = datetime.now() + timedelta(hours=72)
if mode == "php+crowdsec":
geoip_content = GEOIP_SCRIPT.format(
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop
)
else:
geoip_content = GEOIP_SCRIPT_PHP_ONLY.format(
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
shop_name=shop
)
with open(blocking_file, 'w', encoding='utf-8') as f:
f.write(geoip_content)
# Register shop with mode
add_shop_to_active(shop, mode)
print(f" ✅ Aktiviert (bis {expiry.strftime('%Y-%m-%d %H:%M')})")
success_count += 1
except Exception as e:
print(f" ❌ Fehler: {e}")
failed_count += 1
failed_shops.append(shop)
# Summary
print(f"\n{'=' * 60}")
print(" ZUSAMMENFASSUNG")
print(f"{'=' * 60}")
print(f"\n ✅ Erfolgreich aktiviert: {success_count}")
print(f" ❌ Fehlgeschlagen: {failed_count}")
if failed_shops:
print(f"\n Fehlgeschlagene Shops:")
for shop in failed_shops:
print(f"{shop}")
print(f"\n 🇩🇪 🇦🇹 🇨🇭 Nur DACH-Traffic erlaubt")
print(f" 🔧 Modus: {mode_display}")
print(f" ⏰ Gültig für 72 Stunden")
print(f"{'=' * 60}")
def deactivate_all_shops():
"""Deactivate GeoIP blocking for all active shops"""
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine Shops mit aktivem GeoIP-Blocking gefunden")
return
print(f"\n{'=' * 60}")
print(f" DACH GeoIP-Blocking für ALLE Shops deaktivieren")
print(f"{'=' * 60}")
print(f"\n📋 Folgende {len(active_shops)} Shop(s) werden deaktiviert:\n")
for shop in active_shops:
print(f"{shop}")
print(f"\n⚠️ Dies deaktiviert den Schutz für alle oben genannten Shops!")
print(f"⚠️ Alle zugehörigen CrowdSec-Decisions werden ebenfalls entfernt!")
confirm = input(f"\nFortfahren? (ja/nein): ").strip().lower()
if confirm not in ['ja', 'j', 'yes', 'y']:
print("\n❌ Abgebrochen")
return
print(f"\n{'=' * 60}")
print(" Starte Deaktivierung...")
print(f"{'=' * 60}")
success_count = 0
failed_count = 0
failed_shops = []
for i, shop in enumerate(active_shops, 1):
print(f"\n[{i}/{len(active_shops)}] Deaktiviere: {shop}")
try:
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
cache_file = os.path.join(httpdocs, CACHE_FILE)
log_file_path = os.path.join(httpdocs, LOG_FILE)
queue_file = os.path.join(httpdocs, CROWDSEC_QUEUE_FILE)
# Restore backup
if os.path.isfile(backup_php):
shutil.move(backup_php, index_php)
print(f" 📋 index.php wiederhergestellt")
else:
if os.path.isfile(index_php):
with open(index_php, 'r') as f:
content = f.read()
lines = [line for line in content.split('\n') if BLOCKING_FILE not in line]
with open(index_php, 'w') as f:
f.write('\n'.join(lines))
# Remove files
for f in [blocking_file, cache_file, log_file_path, queue_file]:
if os.path.isfile(f):
os.remove(f)
# Remove from tracking
remove_shop_from_active(shop)
# Clean CrowdSec decisions
if check_crowdsec():
cleanup_crowdsec_decisions(shop)
print(f" ✅ Deaktiviert")
success_count += 1
except Exception as e:
print(f" ❌ Fehler: {e}")
failed_count += 1
failed_shops.append(shop)
# Uninstall watcher service
print("\n📦 Deinstalliere CrowdSec-Watcher-Service...")
uninstall_watcher_service()
# Summary
print(f"\n{'=' * 60}")
print(" ZUSAMMENFASSUNG")
print(f"{'=' * 60}")
print(f"\n ✅ Erfolgreich deaktiviert: {success_count}")
print(f" ❌ Fehlgeschlagen: {failed_count}")
if failed_shops:
print(f"\n Fehlgeschlagene Shops:")
for shop in failed_shops:
print(f"{shop}")
print(f"\n 🌍 Alle IPs sind nun wieder erlaubt")
print(f"{'=' * 60}")
def get_shop_log_stats(shop):
"""Get log statistics for a single shop including user agents"""
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
log_file = os.path.join(httpdocs, LOG_FILE)
php_blocks = 0
ips = {} # ip -> {'count': N, 'ua': user_agent}
if os.path.isfile(log_file):
with open(log_file, 'r') as f:
for line in f:
php_blocks += 1
# Extract IP and User-Agent from log line
# Format: [timestamp] IP: x.x.x.x | UA: user_agent | URI: /path
ip = None
ua = 'Unknown'
if 'IP: ' in line:
try:
ip = line.split('IP: ')[1].split(' |')[0].strip()
except:
pass
if 'UA: ' in line:
try:
ua = line.split('UA: ')[1].split(' |')[0].strip()
except:
pass
if ip:
if ip not in ips:
ips[ip] = {'count': 0, 'ua': ua}
ips[ip]['count'] += 1
# Update UA if we have a better one (not Unknown)
if ua != 'Unknown' and ips[ip]['ua'] == 'Unknown':
ips[ip]['ua'] = ua
# Get activation time
activation_time = get_shop_activation_time(shop)
return php_blocks, ips, activation_time
def get_crowdsec_stats_by_shop():
"""Get CrowdSec decision counts grouped by shop"""
if not check_crowdsec():
return {}
stats = {}
code, stdout, _ = run_command("cscli decisions list -o raw --limit 0")
if code == 0 and stdout:
lines = stdout.strip().split('\n')
for line in lines[1:]: # Skip header
# Find shop name in reason field
for shop in get_active_shops():
if shop in line:
stats[shop] = stats.get(shop, 0) + 1
break
return stats
def show_all_logs():
"""Show combined logs for all active shops"""
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine aktiven Shops")
return
print(f"\n{'' * 70}")
print(" 📊 GESAMTÜBERSICHT ALLER SHOPS")
print(f"{'' * 70}")
total_php_blocks = 0
shop_php_stats = {} # shop -> {'blocks': N, 'activation': datetime, 'req_min': float}
all_ips = {} # ip -> {'count': N, 'ua': user_agent}
total_minutes = 0
# Collect PHP stats
for shop in active_shops:
blocks, ips, activation_time = get_shop_log_stats(shop)
total_php_blocks += blocks
# Calculate runtime and req/min
if activation_time:
runtime_minutes = (datetime.now() - activation_time).total_seconds() / 60
req_min = blocks / runtime_minutes if runtime_minutes > 0 else 0
else:
runtime_minutes = 0
req_min = 0
shop_php_stats[shop] = {
'blocks': blocks,
'activation': activation_time,
'runtime_minutes': runtime_minutes,
'req_min': req_min
}
if runtime_minutes > total_minutes:
total_minutes = runtime_minutes
for ip, data in ips.items():
if ip not in all_ips:
all_ips[ip] = {'count': 0, 'ua': data['ua']}
all_ips[ip]['count'] += data['count']
# Keep the most informative UA
if data['ua'] != 'Unknown' and all_ips[ip]['ua'] == 'Unknown':
all_ips[ip]['ua'] = data['ua']
# Calculate total req/min
total_req_min = total_php_blocks / total_minutes if total_minutes > 0 else 0
# Get CrowdSec stats
crowdsec_stats = get_crowdsec_stats_by_shop()
total_crowdsec = sum(crowdsec_stats.values())
# Display PHP blocks with req/min
print(f"\n📝 PHP-Blocks gesamt: {total_php_blocks} (⌀ {total_req_min:.1f} req/min, Laufzeit: {format_duration(total_minutes)})")
if shop_php_stats:
for shop in sorted(shop_php_stats.keys()):
stats = shop_php_stats[shop]
count = stats['blocks']
req_min = stats['req_min']
runtime = stats['runtime_minutes']
bar = "" * min(int(req_min * 2), 20) if req_min > 0 else ""
runtime_str = format_duration(runtime) if runtime > 0 else "?"
print(f" ├─ {shop}: {count} ({req_min:.1f} req/min, seit {runtime_str}) {bar}")
# Display CrowdSec bans
print(f"\n🛡️ CrowdSec-Bans gesamt: {total_crowdsec}")
if crowdsec_stats:
for shop in sorted(crowdsec_stats.keys()):
count = crowdsec_stats[shop]
bar = "" * min(count // 10, 20) if count > 0 else ""
print(f" ├─ {shop}: {count} {bar}")
elif check_crowdsec():
print(" └─ Keine aktiven Bans")
else:
print(" └─ CrowdSec nicht verfügbar")
# Top blocked IPs with bot detection
if all_ips:
print(f"\n🔥 Top 100 blockierte IPs (alle Shops):")
sorted_ips = sorted(all_ips.items(), key=lambda x: x[1]['count'], reverse=True)[:100]
for ip, data in sorted_ips:
count = data['count']
ua = data['ua']
bot_name = detect_bot(ua)
bar = "" * min(count // 5, 20) if count > 0 else ""
print(f" {ip} ({bot_name}): {count} {bar}")
print(f"\n{'' * 70}")
# Wait for user
input("\nDrücke Enter um fortzufahren...")
def show_logs(shop):
"""Show logs for a single shop"""
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
log_file = os.path.join(httpdocs, LOG_FILE)
shop_mode = get_shop_mode(shop)
# Get stats
blocks, ips, activation_time = get_shop_log_stats(shop)
# Calculate runtime and req/min
if activation_time:
runtime_minutes = (datetime.now() - activation_time).total_seconds() / 60
req_min = blocks / runtime_minutes if runtime_minutes > 0 else 0
runtime_str = format_duration(runtime_minutes)
activation_str = activation_time.strftime('%Y-%m-%d %H:%M:%S')
else:
runtime_minutes = 0
req_min = 0
runtime_str = "unbekannt"
activation_str = "unbekannt"
mode_display = "PHP + CrowdSec 🛡️" if shop_mode == "php+crowdsec" else "Nur PHP 📝"
print(f"\n{'' * 70}")
print(f"📊 Logs für {shop} [{mode_display}]")
print(f"{'' * 70}")
print(f"\n⏱️ Aktiviert: {activation_str}")
print(f"⏱️ Laufzeit: {runtime_str}")
print(f"📈 Blocks: {blocks} ({req_min:.1f} req/min)")
if os.path.isfile(log_file):
print(f"\n📝 Letzte 50 PHP-Blocks:")
print("=" * 70)
with open(log_file, 'r') as f:
lines = f.readlines()
for line in lines[-50:]:
print(line.rstrip())
print("=" * 70)
print(f"Gesamt: {len(lines)}")
# Show top IPs with bot detection
if ips:
print(f"\n🔥 Top 20 blockierte IPs:")
sorted_ips = sorted(ips.items(), key=lambda x: x[1]['count'], reverse=True)[:20]
for ip, data in sorted_ips:
count = data['count']
ua = data['ua']
bot_name = detect_bot(ua)
bar = "" * min(count // 5, 20) if count > 0 else ""
print(f" {ip} ({bot_name}): {count} {bar}")
else:
print(f"\n Keine PHP-Logs für {shop}")
# Only show CrowdSec decisions if mode is php+crowdsec
if shop_mode == "php+crowdsec" and check_crowdsec():
print(f"\n🛡️ CrowdSec Decisions:")
print("=" * 70)
# Use raw output with --limit 0 (no pagination)
code, stdout, _ = run_command("cscli decisions list -o raw --limit 0")
if code == 0 and stdout:
lines = stdout.strip().split('\n')
shop_decisions = []
for line in lines[1:]: # Skip header
if shop in line:
shop_decisions.append(line)
if shop_decisions:
print(f"Aktive Bans: {len(shop_decisions)}")
print("\nLetzte 20 Bans:")
for line in shop_decisions[:20]:
parts = line.split(',')
if len(parts) > 8:
# Column 2: ip (format "Ip:1.2.3.4")
ip_field = parts[2].strip()
if ':' in ip_field:
ip = ip_field.split(':', 1)[1]
else:
ip = ip_field
# Column 8: expiration
expiry = parts[8].strip()
print(f" 🚫 {ip} (bis {expiry})")
if len(shop_decisions) > 20:
print(f" ... und {len(shop_decisions) - 20} weitere")
else:
print("Keine aktiven CrowdSec-Bans für diesen Shop")
else:
print("Konnte Decisions nicht abrufen")
print("=" * 70)
elif shop_mode == "php-only":
print(f"\n📝 CrowdSec-Synchronisation ist für diesen Shop deaktiviert (PHP-only Modus)")
def main():
"""Main menu"""
print("\n" + "=" * 60)
print(" GeoIP Shop Blocker Manager - DACH Version")
print(" Erlaubt: 🇩🇪 Deutschland | 🇦🇹 Österreich | 🇨🇭 Schweiz")
print(" PHP + CrowdSec Watcher (systemd service)")
print("=" * 60)
if check_crowdsec():
print(" ✅ CrowdSec: Aktiv")
else:
print(" ⚠️ CrowdSec: Nicht verfügbar")
# Check service status
code, stdout, _ = run_command("systemctl is-active geoip-crowdsec-watcher.service")
if code == 0 and stdout.strip() == "active":
print(" ✅ Watcher-Service: Läuft")
else:
print(" ⚠️ Watcher-Service: Nicht aktiv")
while True:
print("\n" + "-" * 40)
print("[1] GeoIP-Blocking AKTIVIEREN (einzeln)")
print("[2] GeoIP-Blocking DEAKTIVIEREN (einzeln)")
print("[3] Logs anzeigen")
print("[4] Status anzeigen")
print("-" * 40)
print("[5] 🚀 ALLE Shops aktivieren")
print("[6] 🛑 ALLE Shops deaktivieren")
print("-" * 40)
print("[0] Beenden")
choice = input("\nWähle eine Option: ").strip()
if choice == "1":
shops = get_available_shops()
active_shops = get_active_shops()
available_shops = [s for s in shops if s not in active_shops]
if not available_shops:
print("\n⚠️ Keine Shops verfügbar")
continue
print("\n📋 Verfügbare Shops:")
for i, shop in enumerate(available_shops, 1):
print(f" [{i}] {shop}")
shop_choice = input("\nWähle einen Shop: ").strip()
try:
shop_idx = int(shop_choice) - 1
if 0 <= shop_idx < len(available_shops):
selected_shop = available_shops[shop_idx]
# Ask for mode
print(f"\n🔧 Wähle den Blocking-Modus:")
print(f" [1] PHP + CrowdSec (IPs werden an CrowdSec gemeldet)")
print(f" [2] Nur PHP (keine CrowdSec-Synchronisation)")
mode_choice = input(f"\nModus wählen [1/2]: ").strip()
if mode_choice == "2":
mode = "php-only"
mode_display = "Nur PHP"
else:
mode = "php+crowdsec"
mode_display = "PHP + CrowdSec"
confirm = input(f"\n⚠️ DACH-Blocking ({mode_display}) aktivieren für '{selected_shop}'? (ja/nein): ").strip().lower()
if confirm in ['ja', 'j', 'yes', 'y']:
activate_blocking(selected_shop, mode=mode)
else:
print("❌ Ungültig")
except ValueError:
print("❌ Ungültig")
elif choice == "2":
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine aktiven Shops")
continue
print("\n📋 Aktive Shops:")
for i, shop in enumerate(active_shops, 1):
mode = get_shop_mode(shop)
mode_icon = "🛡️" if mode == "php+crowdsec" else "📝"
print(f" [{i}] {shop} {mode_icon}")
shop_choice = input("\nWähle einen Shop: ").strip()
try:
shop_idx = int(shop_choice) - 1
if 0 <= shop_idx < len(active_shops):
selected_shop = active_shops[shop_idx]
confirm = input(f"\n⚠️ Deaktivieren für '{selected_shop}'? (ja/nein): ").strip().lower()
if confirm in ['ja', 'j', 'yes', 'y']:
deactivate_blocking(selected_shop)
else:
print("❌ Ungültig")
except ValueError:
print("❌ Ungültig")
elif choice == "3":
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine aktiven Shops")
continue
print("\n📋 Logs anzeigen für:")
print(f" [0] 📊 ALLE Shops (Zusammenfassung)")
for i, shop in enumerate(active_shops, 1):
mode = get_shop_mode(shop)
mode_icon = "🛡️" if mode == "php+crowdsec" else "📝"
print(f" [{i}] {shop} {mode_icon}")
shop_choice = input("\nWähle eine Option: ").strip()
try:
shop_idx = int(shop_choice)
if shop_idx == 0:
show_all_logs()
elif 1 <= shop_idx <= len(active_shops):
show_logs(active_shops[shop_idx - 1])
else:
print("❌ Ungültig")
except ValueError:
print("❌ Ungültig")
elif choice == "4":
shops = get_available_shops()
active_shops = get_active_shops()
print(f"\n📊 Status:")
print(f" Shops gesamt: {len(shops)}")
print(f" Aktive DACH-Blockings: {len(active_shops)}")
if active_shops:
for shop in active_shops:
mode = get_shop_mode(shop)
mode_icon = "🛡️" if mode == "php+crowdsec" else "📝"
mode_text = "PHP+CS" if mode == "php+crowdsec" else "PHP"
# Get stats
blocks, _, activation_time = get_shop_log_stats(shop)
if activation_time:
runtime_minutes = (datetime.now() - activation_time).total_seconds() / 60
req_min = blocks / runtime_minutes if runtime_minutes > 0 else 0
runtime_str = format_duration(runtime_minutes)
else:
req_min = 0
runtime_str = "?"
print(f"{shop} [{mode_text}] {mode_icon} - {blocks} blocks ({req_min:.1f} req/min, {runtime_str})")
elif choice == "5":
activate_all_shops()
elif choice == "6":
deactivate_all_shops()
elif choice == "0":
print("\n👋 Auf Wiedersehen!")
break
if __name__ == "__main__":
if os.geteuid() != 0:
print("❌ Als root ausführen!")
sys.exit(1)
try:
main()
except KeyboardInterrupt:
print("\n\n👋 Abgebrochen")
sys.exit(0)