Files
geoip_shop_manager/geoip_shop_manager.py

1770 lines
64 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
GeoIP Shop Blocker Manager - DACH & Eurozone Version
2-Component System: PHP blocking + Python watcher (systemd service)
Supports two geo regions:
- DACH: Germany, Austria, Switzerland (3 countries)
- Eurozone+GB: All Eurozone countries + GB + CH (22 countries)
Supports three modes:
- php+crowdsec: GeoIP blocking with CrowdSec integration
- php-only: GeoIP blocking without CrowdSec
- bot-only: Block only bots, shop remains globally accessible
v3.3.0: Added option to activate only direct shops (not behind Link11)
"""
import os
import sys
import shutil
import subprocess
import json
import time
import re
import socket
from datetime import datetime, timedelta
from pathlib import Path
# ANSI Color Codes
COLOR_GREEN = "\033[92m"
COLOR_RED = "\033[91m"
COLOR_YELLOW = "\033[93m"
COLOR_BLUE = "\033[94m"
COLOR_RESET = "\033[0m"
COLOR_BOLD = "\033[1m"
# Link11 IP
LINK11_IP = "128.65.223.106"
# Cache for DNS lookups (to avoid repeated lookups)
DNS_CACHE = {}
# Configuration
VHOSTS_DIR = "/var/www/vhosts"
BACKUP_SUFFIX = ".geoip_backup"
BLOCKING_FILE = "geoip_blocking.php"
CACHE_FILE = "geoip_ip_ranges.cache"
LOG_FILE = "geoip_blocked.log"
CROWDSEC_QUEUE_FILE = "geoip_crowdsec_queue.log"
WATCHER_SCRIPT = "/usr/local/bin/geoip_crowdsec_watcher.py"
SYSTEMD_SERVICE = "/etc/systemd/system/geoip-crowdsec-watcher.service"
ACTIVE_SHOPS_FILE = "/var/lib/crowdsec/geoip_active_shops.json"
# Minimum expected IP ranges per region (for validation)
MIN_RANGES = {
"dach": 1000, # DE+AT+CH should have at least 1000 ranges
"eurozone": 5000 # 22 countries should have at least 5000 ranges
}
# =============================================================================
# GEO REGIONS
# =============================================================================
GEO_REGIONS = {
"dach": {
"name": "DACH",
"countries": ["de", "at", "ch"],
"description": "Deutschland, Österreich, Schweiz",
"icon": "🇩🇪🇦🇹🇨🇭",
"short": "DACH"
},
"eurozone": {
"name": "Eurozone + GB",
"countries": [
"de", "at", "ch", "be", "cy", "ee", "es", "fi", "fr", "gb",
"gr", "hr", "ie", "it", "lt", "lu", "lv", "mt", "nl", "pt", "si", "sk"
],
"description": "22 Länder: DE, AT, CH, BE, CY, EE, ES, FI, FR, GB, GR, HR, IE, IT, LT, LU, LV, MT, NL, PT, SI, SK",
"icon": "🇪🇺",
"short": "EU+"
},
"none": {
"name": "Bot-Only",
"countries": [],
"description": "Nur Bot-Blocking, weltweit erreichbar",
"icon": "🤖",
"short": "BOT"
}
}
# =============================================================================
# BOT DETECTION
# =============================================================================
BOT_PATTERNS = {
'GPTBot': r'GPTBot', 'OAI-SearchBot': r'OAI-SearchBot', 'ChatGPT-User': r'ChatGPT-User',
'ClaudeBot': r'ClaudeBot', 'Claude-User': r'Claude-User', 'anthropic-ai': r'anthropic-ai',
'Googlebot': r'Googlebot', 'Google-Extended': r'Google-Extended', 'AdsBot-Google': r'AdsBot-Google',
'Bingbot': r'[Bb]ingbot', 'BingPreview': r'BingPreview', 'msnbot': r'msnbot',
'PerplexityBot': r'PerplexityBot', 'Applebot': r'Applebot', 'Amazonbot': r'Amazonbot',
'FacebookBot': r'facebookexternalhit|FacebookBot', 'Bytespider': r'Bytespider',
'DuckDuckBot': r'DuckDuckBot', 'YandexBot': r'YandexBot', 'Baiduspider': r'Baiduspider',
'AhrefsBot': r'AhrefsBot', 'SemrushBot': r'SemrushBot', 'MJ12bot': r'MJ12bot',
'DotBot': r'DotBot', 'PetalBot': r'PetalBot', 'DataForSeoBot': r'DataForSeoBot',
'LinkedInBot': r'LinkedInBot', 'Twitterbot': r'Twitterbot', 'Slackbot': r'Slackbot',
'UptimeRobot': r'UptimeRobot', 'Pingdom': r'Pingdom', 'curl': r'^curl/',
'python-requests': r'python-requests', 'Wget': r'Wget', 'Scrapy': r'Scrapy',
}
def detect_bot(user_agent):
if not user_agent or user_agent == 'Unknown':
return 'Unbekannt'
for bot_name, pattern in BOT_PATTERNS.items():
if re.search(pattern, user_agent, re.IGNORECASE):
return bot_name
return 'Unbekannt'
def check_link11(domain):
global DNS_CACHE
if domain in DNS_CACHE:
return DNS_CACHE[domain]
try:
ip = socket.gethostbyname(domain)
is_link11 = (ip == LINK11_IP)
DNS_CACHE[domain] = {'is_link11': is_link11, 'ip': ip}
return DNS_CACHE[domain]
except socket.gaierror:
DNS_CACHE[domain] = {'is_link11': False, 'ip': 'N/A'}
return DNS_CACHE[domain]
def format_shop_with_link11(shop, prefix="", show_index=None):
link11_info = check_link11(shop)
if link11_info['is_link11']:
color, suffix = COLOR_GREEN, " [Link11]"
else:
color, suffix = COLOR_RED, " [Direkt]"
if show_index is not None:
return f" [{show_index}] {color}{shop}{suffix}{COLOR_RESET}"
return f"{prefix}{color}{shop}{suffix}{COLOR_RESET}"
def get_geo_region_info(geo_region):
return GEO_REGIONS.get(geo_region, GEO_REGIONS["dach"])
def generate_php_countries_array(geo_region):
region_info = get_geo_region_info(geo_region)
return ", ".join([f"'{c}'" for c in region_info["countries"]])
def generate_php_bot_patterns():
"""Generate PHP array of bot patterns"""
patterns = []
for bot_name, pattern in BOT_PATTERNS.items():
escaped_pattern = pattern.replace("'", "\\'")
patterns.append(f"'{bot_name}' => '/{escaped_pattern}/i'")
return ",\n ".join(patterns)
# =============================================================================
# CACHE VALIDATION
# =============================================================================
def generate_and_validate_cache(httpdocs_path, geo_region):
"""Generate the IP ranges cache and validate it. Returns (success, range_count, error_message)"""
cache_file = os.path.join(httpdocs_path, CACHE_FILE)
region_info = get_geo_region_info(geo_region)
countries = region_info["countries"]
min_expected = MIN_RANGES.get(geo_region, 1000)
php_script = f'''<?php
$countries = {json.dumps(countries)};
$ranges = [];
foreach ($countries as $country) {{
$url = "https://www.ipdeny.com/ipblocks/data/aggregated/$country-aggregated.zone";
$ctx = stream_context_create(['http' => ['timeout' => 30]]);
$content = @file_get_contents($url, false, $ctx);
if ($content !== false) {{
$lines = explode("\\n", trim($content));
foreach ($lines as $line) {{
$line = trim($line);
if (!empty($line) && strpos($line, '/') !== false) {{
$ranges[] = $line;
}}
}}
}}
}}
if (count($ranges) >= {min_expected}) {{
file_put_contents("{cache_file}", serialize($ranges));
echo "OK:" . count($ranges);
}} else {{
echo "FAIL:" . count($ranges);
}}
'''
temp_php = os.path.join(httpdocs_path, '_geoip_cache_gen.php')
try:
with open(temp_php, 'w') as f:
f.write(php_script)
result = subprocess.run(['php', temp_php], capture_output=True, text=True, timeout=120)
output = result.stdout.strip()
if output.startswith('OK:'):
return True, int(output.split(':')[1]), None
elif output.startswith('FAIL:'):
return False, int(output.split(':')[1]), f"Nur {output.split(':')[1]} Ranges (min. {min_expected} erwartet)"
return False, 0, f"Unerwartete Ausgabe: {output}"
except subprocess.TimeoutExpired:
return False, 0, "Timeout beim Laden der IP-Ranges"
except Exception as e:
return False, 0, str(e)
finally:
if os.path.exists(temp_php):
os.remove(temp_php)
def validate_existing_cache(httpdocs_path, geo_region):
"""Validate existing cache. Returns (valid, range_count, error_message)"""
cache_file = os.path.join(httpdocs_path, CACHE_FILE)
min_expected = MIN_RANGES.get(geo_region, 1000)
if not os.path.exists(cache_file):
return False, 0, "Cache-Datei existiert nicht"
php_script = f'''<?php
$data = @unserialize(@file_get_contents("{cache_file}"));
if ($data === false || !is_array($data)) {{ echo "CORRUPT:0"; }}
else {{ echo "OK:" . count($data); }}
'''
try:
result = subprocess.run(['php', '-r', php_script], capture_output=True, text=True, timeout=10)
output = result.stdout.strip()
if output.startswith('OK:'):
count = int(output.split(':')[1])
if count >= min_expected:
return True, count, None
return False, count, f"Nur {count} Ranges (min. {min_expected} erwartet)"
return False, 0, "Cache-Datei ist korrupt"
except Exception as e:
return False, 0, str(e)
# =============================================================================
# PHP TEMPLATES WITH FAIL-OPEN
# =============================================================================
GEOIP_SCRIPT_TEMPLATE = '''<?php
/**
* GeoIP Blocking Script - {region_name}
* Valid until: {expiry_date}
* FAIL-OPEN: If cache is corrupt/empty, traffic is allowed through
*/
$expiry_date = strtotime('{expiry_timestamp}');
if (time() > $expiry_date) return;
$visitor_ip = $_SERVER['REMOTE_ADDR'] ?? '';
if (empty($visitor_ip)) return;
if (filter_var($visitor_ip, FILTER_VALIDATE_IP, FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE) === false) return;
$cache_file = __DIR__ . '/{cache_file}';
$cache_duration = 86400;
$log_file = __DIR__ . '/{log_file}';
$crowdsec_queue = __DIR__ . '/{crowdsec_queue}';
$min_ranges = {min_ranges};
$allowed_countries = [{countries_array}];
function download_allowed_ranges($countries) {{
$ranges = [];
foreach ($countries as $country) {{
$url = "https://www.ipdeny.com/ipblocks/data/aggregated/$country-aggregated.zone";
$ctx = stream_context_create(['http' => ['timeout' => 30]]);
$content = @file_get_contents($url, false, $ctx);
if ($content !== false) {{
foreach (explode("\\n", trim($content)) as $line) {{
$line = trim($line);
if (!empty($line) && strpos($line, '/') !== false) $ranges[] = $line;
}}
}}
}}
return $ranges;
}}
function ip_in_range($ip, $cidr) {{
list($subnet, $mask) = explode('/', $cidr);
$mask_long = -1 << (32 - (int)$mask);
return (ip2long($ip) & $mask_long) == (ip2long($subnet) & $mask_long);
}}
$allowed_ranges = [];
$cache_valid = false;
if (file_exists($cache_file) && (time() - filemtime($cache_file)) < $cache_duration) {{
$cached_data = @file_get_contents($cache_file);
if ($cached_data !== false) {{
$allowed_ranges = @unserialize($cached_data);
if (is_array($allowed_ranges) && count($allowed_ranges) >= $min_ranges) {{
$cache_valid = true;
}} else {{
@unlink($cache_file);
$allowed_ranges = [];
}}
}}
}}
if (!$cache_valid) {{
$allowed_ranges = download_allowed_ranges($allowed_countries);
if (is_array($allowed_ranges) && count($allowed_ranges) >= $min_ranges) {{
@file_put_contents($cache_file, serialize($allowed_ranges));
$cache_valid = true;
}} else {{
error_log("GeoIP FAIL-OPEN: Could not load valid IP ranges (got " . count($allowed_ranges) . ", need $min_ranges)");
return;
}}
}}
$is_allowed = false;
foreach ($allowed_ranges as $range) {{
if (ip_in_range($visitor_ip, $range)) {{ $is_allowed = true; break; }}
}}
if (!$is_allowed) {{
$timestamp = date('Y-m-d H:i:s');
$ua = $_SERVER['HTTP_USER_AGENT'] ?? 'Unknown';
$uri = $_SERVER['REQUEST_URI'] ?? '/';
@file_put_contents($log_file, "[$timestamp] IP: $visitor_ip | UA: $ua | URI: $uri\\n", FILE_APPEND | LOCK_EX);
@file_put_contents($crowdsec_queue, "$timestamp|$visitor_ip|{shop_name}\\n", FILE_APPEND | LOCK_EX);
header('HTTP/1.1 403 Forbidden');
exit;
}}
'''
GEOIP_SCRIPT_TEMPLATE_PHP_ONLY = '''<?php
/**
* GeoIP Blocking Script (PHP-only) - {region_name}
* Valid until: {expiry_date}
* FAIL-OPEN: If cache is corrupt/empty, traffic is allowed through
*/
$expiry_date = strtotime('{expiry_timestamp}');
if (time() > $expiry_date) return;
$visitor_ip = $_SERVER['REMOTE_ADDR'] ?? '';
if (empty($visitor_ip)) return;
if (filter_var($visitor_ip, FILTER_VALIDATE_IP, FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE) === false) return;
$cache_file = __DIR__ . '/{cache_file}';
$cache_duration = 86400;
$log_file = __DIR__ . '/{log_file}';
$min_ranges = {min_ranges};
$allowed_countries = [{countries_array}];
function download_allowed_ranges($countries) {{
$ranges = [];
foreach ($countries as $country) {{
$url = "https://www.ipdeny.com/ipblocks/data/aggregated/$country-aggregated.zone";
$ctx = stream_context_create(['http' => ['timeout' => 30]]);
$content = @file_get_contents($url, false, $ctx);
if ($content !== false) {{
foreach (explode("\\n", trim($content)) as $line) {{
$line = trim($line);
if (!empty($line) && strpos($line, '/') !== false) $ranges[] = $line;
}}
}}
}}
return $ranges;
}}
function ip_in_range($ip, $cidr) {{
list($subnet, $mask) = explode('/', $cidr);
$mask_long = -1 << (32 - (int)$mask);
return (ip2long($ip) & $mask_long) == (ip2long($subnet) & $mask_long);
}}
$allowed_ranges = [];
$cache_valid = false;
if (file_exists($cache_file) && (time() - filemtime($cache_file)) < $cache_duration) {{
$cached_data = @file_get_contents($cache_file);
if ($cached_data !== false) {{
$allowed_ranges = @unserialize($cached_data);
if (is_array($allowed_ranges) && count($allowed_ranges) >= $min_ranges) {{
$cache_valid = true;
}} else {{
@unlink($cache_file);
$allowed_ranges = [];
}}
}}
}}
if (!$cache_valid) {{
$allowed_ranges = download_allowed_ranges($allowed_countries);
if (is_array($allowed_ranges) && count($allowed_ranges) >= $min_ranges) {{
@file_put_contents($cache_file, serialize($allowed_ranges));
$cache_valid = true;
}} else {{
error_log("GeoIP FAIL-OPEN: Could not load valid IP ranges (got " . count($allowed_ranges) . ", need $min_ranges)");
return;
}}
}}
$is_allowed = false;
foreach ($allowed_ranges as $range) {{
if (ip_in_range($visitor_ip, $range)) {{ $is_allowed = true; break; }}
}}
if (!$is_allowed) {{
$timestamp = date('Y-m-d H:i:s');
$ua = $_SERVER['HTTP_USER_AGENT'] ?? 'Unknown';
$uri = $_SERVER['REQUEST_URI'] ?? '/';
@file_put_contents($log_file, "[$timestamp] IP: $visitor_ip | UA: $ua | URI: $uri\\n", FILE_APPEND | LOCK_EX);
header('HTTP/1.1 403 Forbidden');
exit;
}}
'''
BOT_ONLY_SCRIPT_TEMPLATE = '''<?php
/**
* Bot Blocking Script - Worldwide Access
* Valid until: {expiry_date}
* Blocks known bots/crawlers, allows all human traffic globally
*/
$expiry_date = strtotime('{expiry_timestamp}');
if (time() > $expiry_date) return;
$log_file = __DIR__ . '/{log_file}';
$crowdsec_queue = __DIR__ . '/{crowdsec_queue}';
$bot_patterns = [
{bot_patterns}
];
$user_agent = $_SERVER['HTTP_USER_AGENT'] ?? '';
if (empty($user_agent)) return;
$detected_bot = null;
foreach ($bot_patterns as $bot_name => $pattern) {{
if (preg_match($pattern, $user_agent)) {{
$detected_bot = $bot_name;
break;
}}
}}
if ($detected_bot !== null) {{
$timestamp = date('Y-m-d H:i:s');
$visitor_ip = $_SERVER['REMOTE_ADDR'] ?? 'Unknown';
$uri = $_SERVER['REQUEST_URI'] ?? '/';
@file_put_contents($log_file, "[$timestamp] BOT: $detected_bot | IP: $visitor_ip | UA: $user_agent | URI: $uri\\n", FILE_APPEND | LOCK_EX);
@file_put_contents($crowdsec_queue, "$timestamp|$visitor_ip|{shop_name}\\n", FILE_APPEND | LOCK_EX);
header('HTTP/1.1 403 Forbidden');
exit;
}}
'''
BOT_ONLY_SCRIPT_TEMPLATE_NO_CROWDSEC = '''<?php
/**
* Bot Blocking Script - Worldwide Access (No CrowdSec)
* Valid until: {expiry_date}
* Blocks known bots/crawlers, allows all human traffic globally
*/
$expiry_date = strtotime('{expiry_timestamp}');
if (time() > $expiry_date) return;
$log_file = __DIR__ . '/{log_file}';
$bot_patterns = [
{bot_patterns}
];
$user_agent = $_SERVER['HTTP_USER_AGENT'] ?? '';
if (empty($user_agent)) return;
$detected_bot = null;
foreach ($bot_patterns as $bot_name => $pattern) {{
if (preg_match($pattern, $user_agent)) {{
$detected_bot = $bot_name;
break;
}}
}}
if ($detected_bot !== null) {{
$timestamp = date('Y-m-d H:i:s');
$visitor_ip = $_SERVER['REMOTE_ADDR'] ?? 'Unknown';
$uri = $_SERVER['REQUEST_URI'] ?? '/';
@file_put_contents($log_file, "[$timestamp] BOT: $detected_bot | IP: $visitor_ip | UA: $user_agent | URI: $uri\\n", FILE_APPEND | LOCK_EX);
header('HTTP/1.1 403 Forbidden');
exit;
}}
'''
# =============================================================================
# WATCHER SERVICE
# =============================================================================
WATCHER_SCRIPT_CONTENT = '''#!/usr/bin/env python3
import os, sys, time, subprocess, json
from datetime import datetime
VHOSTS_DIR = "/var/www/vhosts"
QUEUE_FILE = "geoip_crowdsec_queue.log"
ACTIVE_SHOPS_FILE = "/var/lib/crowdsec/geoip_active_shops.json"
PROCESSED_IPS = {}
CHECK_INTERVAL = 5
def log(msg): print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}", flush=True)
def get_active_shops():
if not os.path.isfile(ACTIVE_SHOPS_FILE): return {}
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f: return json.load(f)
except: return {}
def add_to_crowdsec(ip, shop):
now = time.time()
if ip in PROCESSED_IPS and (now - PROCESSED_IPS[ip]) < 3600: return True
try:
result = subprocess.run(['cscli', 'decisions', 'add', '--ip', ip, '--duration', '72h', '--type', 'ban', '--reason', f'GeoIP: blocked by {shop}'], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
PROCESSED_IPS[ip] = now
log(f"✅ Added {ip} to CrowdSec (from {shop})")
return True
except: pass
return False
def process_queue_file(shop_path, shop):
queue_file = os.path.join(shop_path, 'httpdocs', QUEUE_FILE)
if not os.path.isfile(queue_file): return 0
processed = 0
try:
with open(queue_file, 'r') as f: lines = f.readlines()
if not lines: return 0
for line in lines:
parts = line.strip().split('|')
if len(parts) >= 2 and add_to_crowdsec(parts[1], shop): processed += 1
if processed > 0:
with open(queue_file, 'w') as f: f.write('')
except: pass
return processed
def main():
log("🚀 GeoIP CrowdSec Watcher started")
while True:
try:
active_shops = get_active_shops()
for shop, info in active_shops.items():
# Only process shops with CrowdSec enabled
mode = info.get('mode', 'php+crowdsec')
if mode in ['php+crowdsec', 'bot+crowdsec']:
shop_path = os.path.join(VHOSTS_DIR, shop)
if os.path.isdir(shop_path): process_queue_file(shop_path, shop)
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt: break
except: time.sleep(CHECK_INTERVAL)
if __name__ == "__main__": main()
'''
SYSTEMD_SERVICE_CONTENT = '''[Unit]
Description=GeoIP CrowdSec Watcher Service
After=network.target crowdsec.service
[Service]
Type=simple
ExecStart=/usr/bin/python3 /usr/local/bin/geoip_crowdsec_watcher.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
'''
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def run_command(cmd, capture_output=True):
try:
if capture_output:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
return result.returncode, result.stdout, result.stderr
return subprocess.run(cmd, shell=True, timeout=30).returncode, "", ""
except Exception as e:
return -1, "", str(e)
def check_crowdsec():
code, stdout, _ = run_command("systemctl is-active crowdsec")
return code == 0 and stdout.strip() == "active"
def install_watcher_service():
print(" 📦 Installiere CrowdSec-Watcher-Service...")
with open(WATCHER_SCRIPT, 'w') as f:
f.write(WATCHER_SCRIPT_CONTENT)
os.chmod(WATCHER_SCRIPT, 0o755)
with open(SYSTEMD_SERVICE, 'w') as f:
f.write(SYSTEMD_SERVICE_CONTENT)
run_command("systemctl daemon-reload")
run_command("systemctl enable geoip-crowdsec-watcher.service")
run_command("systemctl start geoip-crowdsec-watcher.service")
time.sleep(1)
code, stdout, _ = run_command("systemctl is-active geoip-crowdsec-watcher.service")
if code == 0 and stdout.strip() == "active":
print(" ✅ Service gestartet")
return True
print(" ⚠️ Service konnte nicht gestartet werden")
return False
def uninstall_watcher_service():
print(" 📦 Deinstalliere CrowdSec-Watcher-Service...")
run_command("systemctl stop geoip-crowdsec-watcher.service")
run_command("systemctl disable geoip-crowdsec-watcher.service")
for f in [SYSTEMD_SERVICE, WATCHER_SCRIPT]:
if os.path.isfile(f):
os.remove(f)
run_command("systemctl daemon-reload")
print(" ✅ Service deinstalliert")
def add_shop_to_active(shop, mode="php+crowdsec", geo_region="dach"):
os.makedirs(os.path.dirname(ACTIVE_SHOPS_FILE), exist_ok=True)
shops = {}
if os.path.isfile(ACTIVE_SHOPS_FILE):
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
shops[shop] = {
"activated": datetime.now().isoformat(),
"expiry": (datetime.now() + timedelta(hours=72)).isoformat(),
"mode": mode,
"geo_region": geo_region
}
with open(ACTIVE_SHOPS_FILE, 'w') as f:
json.dump(shops, f, indent=2)
def get_shop_mode(shop):
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return "php+crowdsec"
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
return json.load(f).get(shop, {}).get("mode", "php+crowdsec")
except:
return "php+crowdsec"
def get_shop_geo_region(shop):
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return "dach"
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
return json.load(f).get(shop, {}).get("geo_region", "dach")
except:
return "dach"
def get_shop_activation_time(shop):
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return None
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
activated_str = json.load(f).get(shop, {}).get("activated")
return datetime.fromisoformat(activated_str) if activated_str else None
except:
return None
def format_duration(minutes):
if minutes < 60:
return f"{int(minutes)}m"
hours = minutes / 60
if hours < 24:
return f"{int(hours)}h {int(minutes % 60)}m"
return f"{int(hours / 24)}d {int(hours % 24)}h"
def remove_shop_from_active(shop):
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
if shop in shops:
del shops[shop]
with open(ACTIVE_SHOPS_FILE, 'w') as f:
json.dump(shops, f, indent=2)
def cleanup_crowdsec_decisions(shop):
if not check_crowdsec():
return
print(f" 🔍 Entferne CrowdSec-Decisions für {shop}...")
total_removed = 0
for _ in range(50):
code, stdout, _ = run_command(f"cscli decisions list -o raw --limit 0 | grep '{shop}'")
if code != 0 or not stdout.strip():
break
batch_count = 0
for line in stdout.strip().split('\n')[:100]:
try:
parts = line.split(',')
if len(parts) >= 3:
ip_field = parts[2].strip()
ip = ip_field.split(':', 1)[1] if ':' in ip_field else ip_field
if ip and run_command(f"cscli decisions delete --ip {ip}")[0] == 0:
batch_count += 1
except:
continue
total_removed += batch_count
if batch_count == 0:
break
print(f"{total_removed} Decisions entfernt" if total_removed else " Keine Decisions gefunden")
def get_available_shops():
shops = []
if not os.path.exists(VHOSTS_DIR):
return shops
for entry in os.listdir(VHOSTS_DIR):
shop_path = os.path.join(VHOSTS_DIR, entry)
if os.path.isdir(shop_path) and entry not in ['chroot', 'system', 'default']:
httpdocs = os.path.join(shop_path, 'httpdocs')
if os.path.isdir(httpdocs) and os.path.isfile(os.path.join(httpdocs, 'index.php')):
shops.append(entry)
return sorted(shops)
def get_active_shops():
active = []
for shop in get_available_shops():
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
if os.path.isfile(os.path.join(httpdocs, BLOCKING_FILE)) or os.path.isfile(os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')):
active.append(shop)
return active
def get_crowdsec_modes():
"""Return list of modes that use CrowdSec"""
return ['php+crowdsec', 'bot+crowdsec']
def select_geo_region():
print(f"\n🌍 Wähle die Geo-Region:")
print(f" [1] {GEO_REGIONS['dach']['icon']} DACH - {GEO_REGIONS['dach']['description']}")
print(f" [2] {GEO_REGIONS['eurozone']['icon']} Eurozone+GB - {len(GEO_REGIONS['eurozone']['countries'])} Länder")
return "eurozone" if input(f"\nRegion wählen [1/2]: ").strip() == "2" else "dach"
def select_mode():
print(f"\n🔧 Wähle den Blocking-Modus:")
print(f" [1] 🌍 GeoIP + CrowdSec (IPs werden an CrowdSec gemeldet)")
print(f" [2] 🌍 Nur GeoIP (keine CrowdSec-Synchronisation)")
print(f" [3] 🤖 Nur Bot-Blocking (weltweit erreichbar, mit CrowdSec)")
print(f" [4] 🤖 Nur Bot-Blocking (weltweit erreichbar, ohne CrowdSec)")
choice = input(f"\nModus wählen [1/2/3/4]: ").strip()
if choice == "2":
return "php-only"
elif choice == "3":
return "bot+crowdsec"
elif choice == "4":
return "bot-only"
return "php+crowdsec"
def get_mode_icon(mode):
"""Return icon for mode"""
icons = {
'php+crowdsec': '🛡️',
'php-only': '📝',
'bot+crowdsec': '🤖🛡️',
'bot-only': '🤖'
}
return icons.get(mode, '')
def get_mode_description(mode):
"""Return description for mode"""
descriptions = {
'php+crowdsec': 'GeoIP + CrowdSec',
'php-only': 'Nur GeoIP',
'bot+crowdsec': 'Bot-Block + CrowdSec',
'bot-only': 'Nur Bot-Block'
}
return descriptions.get(mode, mode)
def is_bot_only_mode(mode):
"""Check if mode is bot-only"""
return mode in ['bot-only', 'bot+crowdsec']
def uses_crowdsec(mode):
"""Check if mode uses CrowdSec"""
return mode in ['php+crowdsec', 'bot+crowdsec']
def get_direct_shops(available_shops):
"""Return list of shops that are NOT behind Link11 (direct exposure)"""
direct_shops = []
for shop in available_shops:
link11_info = check_link11(shop)
if not link11_info['is_link11']:
direct_shops.append(shop)
return direct_shops
def get_link11_shops(available_shops):
"""Return list of shops that ARE behind Link11"""
link11_shops = []
for shop in available_shops:
link11_info = check_link11(shop)
if link11_info['is_link11']:
link11_shops.append(shop)
return link11_shops
# =============================================================================
# MAIN FUNCTIONS
# =============================================================================
def activate_blocking(shop, silent=False, mode="php+crowdsec", geo_region="dach"):
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
is_bot_mode = is_bot_only_mode(mode)
if is_bot_mode:
region_info = get_geo_region_info("none")
geo_region = "none"
else:
region_info = get_geo_region_info(geo_region)
min_ranges = MIN_RANGES.get(geo_region, 1000)
if os.path.isfile(backup_php):
if not silent:
print(f"⚠️ Blocking bereits aktiv für {shop}")
return False
if not os.path.isfile(index_php):
if not silent:
print(f"❌ index.php nicht gefunden")
return False
if not silent:
print(f"\n🔧 Aktiviere {region_info['icon']} {region_info['name']} für: {shop}")
if is_bot_mode:
print(f" Modus: Nur Bot-Blocking (weltweit erreichbar)")
else:
print(f" Erlaubt: {region_info['description']}")
print(f" CrowdSec: {'Ja' if uses_crowdsec(mode) else 'Nein'}")
print("=" * 60)
# Step 1: Watcher service
if uses_crowdsec(mode):
crowdsec_shops = [s for s in get_active_shops() if uses_crowdsec(get_shop_mode(s))]
if not crowdsec_shops and check_crowdsec():
if not silent:
print("\n[1/4] Installiere CrowdSec-Watcher-Service...")
install_watcher_service()
elif not silent:
print("\n[1/4] CrowdSec-Watcher-Service bereits aktiv")
elif not silent:
print("\n[1/4] CrowdSec-Synchronisation deaktiviert")
# Step 2: PHP blocking
if not silent:
print("\n[2/4] Aktiviere PHP-Blocking...")
shutil.copy2(index_php, backup_php)
with open(index_php, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
insert_line = 0
for i, line in enumerate(lines):
if 'declare(strict_types' in line:
insert_line = i + 1
break
elif '<?php' in line and insert_line == 0:
insert_line = i + 1
require_statement = f"require_once __DIR__ . '/{BLOCKING_FILE}';"
if require_statement not in content:
lines.insert(insert_line, require_statement)
with open(index_php, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
expiry = datetime.now() + timedelta(hours=72)
# Select appropriate template
if is_bot_mode:
if uses_crowdsec(mode):
template = BOT_ONLY_SCRIPT_TEMPLATE
else:
template = BOT_ONLY_SCRIPT_TEMPLATE_NO_CROWDSEC
geoip_content = template.format(
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop,
bot_patterns=generate_php_bot_patterns()
)
else:
countries_array = generate_php_countries_array(geo_region)
template = GEOIP_SCRIPT_TEMPLATE if uses_crowdsec(mode) else GEOIP_SCRIPT_TEMPLATE_PHP_ONLY
geoip_content = template.format(
region_name=region_info['name'],
region_description=region_info['description'],
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop,
countries_array=countries_array,
min_ranges=min_ranges
)
with open(blocking_file, 'w', encoding='utf-8') as f:
f.write(geoip_content)
if not silent:
print(" ✅ PHP-Blocking aktiviert")
# Step 3: Generate cache (only for GeoIP modes)
if is_bot_mode:
if not silent:
print(f"\n[3/4] Cache-Generierung nicht erforderlich (Bot-Only)")
range_count = 0
else:
if not silent:
print(f"\n[3/4] Generiere IP-Range-Cache ({len(region_info['countries'])} Länder)...")
success, range_count, error = generate_and_validate_cache(httpdocs, geo_region)
if success:
if not silent:
print(f" ✅ Cache generiert: {range_count:,} IP-Ranges")
else:
if not silent:
print(f" ⚠️ Cache-Generierung: {error}")
print(f" Fail-Open aktiv - Cache wird beim ersten Request neu generiert")
# Step 4: Register
if not silent:
print("\n[4/4] Registriere Shop...")
add_shop_to_active(shop, mode, geo_region)
if not silent:
print("\n" + "=" * 60)
print(f"{region_info['icon']} {region_info['name']} aktiviert")
print(f" Shop: {shop}")
print(f" Modus: {get_mode_description(mode)} {get_mode_icon(mode)}")
if not is_bot_mode:
print(f" IP-Ranges: {range_count:,}")
print(f" 🛡️ Fail-Open: Bei Cache-Fehlern wird Traffic durchgelassen")
else:
print(f" 🤖 {len(BOT_PATTERNS)} Bot-Patterns aktiv")
print(f" Gültig bis: {expiry.strftime('%Y-%m-%d %H:%M:%S CET')}")
print("=" * 60)
return True
def deactivate_blocking(shop, silent=False):
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
shop_mode = get_shop_mode(shop)
shop_geo = get_shop_geo_region(shop)
region_info = get_geo_region_info(shop_geo)
if not silent:
print(f"\n🔧 Deaktiviere {region_info['icon']} {region_info['name']} für: {shop}")
print("=" * 60)
# Restore backup
if not silent:
print("\n[1/4] PHP-Blocking entfernen...")
if os.path.isfile(backup_php):
shutil.move(backup_php, index_php)
else:
if os.path.isfile(index_php):
with open(index_php, 'r') as f:
content = f.read()
lines = [l for l in content.split('\n') if BLOCKING_FILE not in l]
with open(index_php, 'w') as f:
f.write('\n'.join(lines))
for f in [os.path.join(httpdocs, x) for x in [BLOCKING_FILE, CACHE_FILE, LOG_FILE, CROWDSEC_QUEUE_FILE]]:
if os.path.isfile(f):
os.remove(f)
if not silent:
print("\n[2/4] Deregistriere Shop...")
remove_shop_from_active(shop)
if not silent:
print("\n[3/4] CrowdSec-Decisions entfernen...")
if uses_crowdsec(shop_mode) and check_crowdsec():
cleanup_crowdsec_decisions(shop)
elif not silent:
print(" Keine CrowdSec-Synchronisation aktiv")
if not silent:
print("\n[4/4] Prüfe Watcher-Service...")
crowdsec_shops = [s for s in get_active_shops() if uses_crowdsec(get_shop_mode(s))]
if not crowdsec_shops:
uninstall_watcher_service()
elif not silent:
print(f" Service bleibt aktiv ({len(crowdsec_shops)} Shop(s))")
if not silent:
print("\n" + "=" * 60)
print(f"✅ Blocking deaktiviert für: {shop}")
print("=" * 60)
return True
def activate_all_shops():
available_shops = [s for s in get_available_shops() if s not in get_active_shops()]
if not available_shops:
print("\n⚠️ Keine Shops zum Aktivieren verfügbar")
return
print(f"\n{'=' * 60}")
print(f" Blocking für ALLE Shops aktivieren")
print(f"{'=' * 60}")
print(f"\n📋 {len(available_shops)} Shop(s):")
for shop in available_shops:
print(format_shop_with_link11(shop, prefix=""))
mode = select_mode()
is_bot_mode = is_bot_only_mode(mode)
if is_bot_mode:
geo_region = "none"
region_info = get_geo_region_info("none")
else:
geo_region = select_geo_region()
region_info = get_geo_region_info(geo_region)
print(f"\n⚠️ Modus: {get_mode_description(mode)} {get_mode_icon(mode)}")
if not is_bot_mode:
print(f" Region: {region_info['icon']} {region_info['name']}")
if input(f"\nFortfahren? (ja/nein): ").strip().lower() not in ['ja', 'j', 'yes', 'y']:
print("\n❌ Abgebrochen")
return
print(f"\n{'=' * 60}")
if uses_crowdsec(mode) and check_crowdsec():
if not [s for s in get_active_shops() if uses_crowdsec(get_shop_mode(s))]:
print("\n📦 Installiere CrowdSec-Watcher-Service...")
install_watcher_service()
success_count = 0
for i, shop in enumerate(available_shops, 1):
print(f"\n[{i}/{len(available_shops)}] {shop}")
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
if os.path.isfile(backup_php) or not os.path.isfile(index_php):
print(f" ⚠️ Übersprungen")
continue
shutil.copy2(index_php, backup_php)
with open(index_php, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
insert_line = 0
for idx, line in enumerate(lines):
if 'declare(strict_types' in line:
insert_line = idx + 1
break
elif '<?php' in line and insert_line == 0:
insert_line = idx + 1
require_statement = f"require_once __DIR__ . '/{BLOCKING_FILE}';"
if require_statement not in content:
lines.insert(insert_line, require_statement)
with open(index_php, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
expiry = datetime.now() + timedelta(hours=72)
if is_bot_mode:
if uses_crowdsec(mode):
template = BOT_ONLY_SCRIPT_TEMPLATE
else:
template = BOT_ONLY_SCRIPT_TEMPLATE_NO_CROWDSEC
geoip_content = template.format(
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop,
bot_patterns=generate_php_bot_patterns()
)
else:
template = GEOIP_SCRIPT_TEMPLATE if uses_crowdsec(mode) else GEOIP_SCRIPT_TEMPLATE_PHP_ONLY
geoip_content = template.format(
region_name=region_info['name'],
region_description=region_info['description'],
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop,
countries_array=generate_php_countries_array(geo_region),
min_ranges=MIN_RANGES.get(geo_region, 1000)
)
with open(blocking_file, 'w', encoding='utf-8') as f:
f.write(geoip_content)
if is_bot_mode:
add_shop_to_active(shop, mode, geo_region)
print(f" ✅ Aktiviert ({len(BOT_PATTERNS)} Bot-Patterns)")
else:
print(f" ⏳ Cache generieren...")
cache_ok, count, _ = generate_and_validate_cache(httpdocs, geo_region)
add_shop_to_active(shop, mode, geo_region)
print(f" ✅ Aktiviert ({count:,} Ranges)" if cache_ok else f" ⚠️ Aktiviert (Cache-Warnung)")
success_count += 1
print(f"\n{'=' * 60}")
print(f"{success_count} Shop(s) aktiviert")
if not is_bot_mode:
print(f" 🛡️ Fail-Open bei Cache-Fehlern aktiv")
print(f"{'=' * 60}")
def activate_direct_shops_only():
"""Activate blocking only for shops that are NOT behind Link11 (direct exposure)"""
available_shops = [s for s in get_available_shops() if s not in get_active_shops()]
if not available_shops:
print("\n⚠️ Keine Shops zum Aktivieren verfügbar")
return
# Filter: only shops NOT behind Link11
direct_shops = get_direct_shops(available_shops)
link11_shops = get_link11_shops(available_shops)
if not direct_shops:
print("\n⚠️ Keine direkten Shops gefunden (alle sind hinter Link11)")
if link11_shops:
print(f"\n📋 {len(link11_shops)} Shop(s) hinter Link11 (werden übersprungen):")
for shop in link11_shops:
print(f" {COLOR_GREEN}{shop} [Link11]{COLOR_RESET}")
return
print(f"\n{'=' * 60}")
print(f" Blocking für DIREKTE Shops aktivieren (ohne Link11)")
print(f"{'=' * 60}")
print(f" {COLOR_GREEN}Grün = hinter Link11 (übersprungen){COLOR_RESET}")
print(f" {COLOR_RED}Rot = Direkt (wird aktiviert){COLOR_RESET}")
print(f"\n📋 {len(direct_shops)} direkte Shop(s) werden aktiviert:")
for shop in direct_shops:
print(f" {COLOR_RED}{shop} [Direkt]{COLOR_RESET}")
if link11_shops:
print(f"\n⏭️ {len(link11_shops)} Shop(s) hinter Link11 werden übersprungen:")
for shop in link11_shops:
print(f" {COLOR_GREEN}{shop} [Link11]{COLOR_RESET}")
mode = select_mode()
is_bot_mode = is_bot_only_mode(mode)
if is_bot_mode:
geo_region = "none"
region_info = get_geo_region_info("none")
else:
geo_region = select_geo_region()
region_info = get_geo_region_info(geo_region)
print(f"\n⚠️ Modus: {get_mode_description(mode)} {get_mode_icon(mode)}")
if not is_bot_mode:
print(f" Region: {region_info['icon']} {region_info['name']}")
print(f" Aktiviert: {len(direct_shops)} direkte Shop(s)")
print(f" Übersprungen: {len(link11_shops)} Link11-Shop(s)")
if input(f"\nFortfahren? (ja/nein): ").strip().lower() not in ['ja', 'j', 'yes', 'y']:
print("\n❌ Abgebrochen")
return
print(f"\n{'=' * 60}")
if uses_crowdsec(mode) and check_crowdsec():
if not [s for s in get_active_shops() if uses_crowdsec(get_shop_mode(s))]:
print("\n📦 Installiere CrowdSec-Watcher-Service...")
install_watcher_service()
success_count = 0
for i, shop in enumerate(direct_shops, 1):
print(f"\n[{i}/{len(direct_shops)}] {shop}")
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
if os.path.isfile(backup_php) or not os.path.isfile(index_php):
print(f" ⚠️ Übersprungen")
continue
shutil.copy2(index_php, backup_php)
with open(index_php, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
insert_line = 0
for idx, line in enumerate(lines):
if 'declare(strict_types' in line:
insert_line = idx + 1
break
elif '<?php' in line and insert_line == 0:
insert_line = idx + 1
require_statement = f"require_once __DIR__ . '/{BLOCKING_FILE}';"
if require_statement not in content:
lines.insert(insert_line, require_statement)
with open(index_php, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
expiry = datetime.now() + timedelta(hours=72)
if is_bot_mode:
if uses_crowdsec(mode):
template = BOT_ONLY_SCRIPT_TEMPLATE
else:
template = BOT_ONLY_SCRIPT_TEMPLATE_NO_CROWDSEC
geoip_content = template.format(
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop,
bot_patterns=generate_php_bot_patterns()
)
else:
template = GEOIP_SCRIPT_TEMPLATE if uses_crowdsec(mode) else GEOIP_SCRIPT_TEMPLATE_PHP_ONLY
geoip_content = template.format(
region_name=region_info['name'],
region_description=region_info['description'],
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop,
countries_array=generate_php_countries_array(geo_region),
min_ranges=MIN_RANGES.get(geo_region, 1000)
)
with open(blocking_file, 'w', encoding='utf-8') as f:
f.write(geoip_content)
if is_bot_mode:
add_shop_to_active(shop, mode, geo_region)
print(f" ✅ Aktiviert ({len(BOT_PATTERNS)} Bot-Patterns)")
else:
print(f" ⏳ Cache generieren...")
cache_ok, count, _ = generate_and_validate_cache(httpdocs, geo_region)
add_shop_to_active(shop, mode, geo_region)
print(f" ✅ Aktiviert ({count:,} Ranges)" if cache_ok else f" ⚠️ Aktiviert (Cache-Warnung)")
success_count += 1
print(f"\n{'=' * 60}")
print(f"{success_count} direkte Shop(s) aktiviert")
print(f" ⏭️ {len(link11_shops)} Link11-Shop(s) übersprungen")
if not is_bot_mode:
print(f" 🛡️ Fail-Open bei Cache-Fehlern aktiv")
print(f"{'=' * 60}")
def deactivate_all_shops():
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine aktiven Shops")
return
print(f"\n{'=' * 60}")
print(f" Blocking für ALLE deaktivieren")
print(f"{'=' * 60}")
print(f"\n📋 {len(active_shops)} Shop(s):")
for shop in active_shops:
region_info = get_geo_region_info(get_shop_geo_region(shop))
mode_icon = get_mode_icon(get_shop_mode(shop))
print(f"{format_shop_with_link11(shop)} {region_info['icon']} {mode_icon}")
if input(f"\nFortfahren? (ja/nein): ").strip().lower() not in ['ja', 'j', 'yes', 'y']:
print("\n❌ Abgebrochen")
return
for i, shop in enumerate(active_shops, 1):
print(f"\n[{i}/{len(active_shops)}] {shop}")
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
index_php = os.path.join(httpdocs, 'index.php')
if os.path.isfile(backup_php):
shutil.move(backup_php, index_php)
for f in [os.path.join(httpdocs, x) for x in [BLOCKING_FILE, CACHE_FILE, LOG_FILE, CROWDSEC_QUEUE_FILE]]:
if os.path.isfile(f):
os.remove(f)
remove_shop_from_active(shop)
if check_crowdsec():
cleanup_crowdsec_decisions(shop)
print(f" ✅ Deaktiviert")
uninstall_watcher_service()
print(f"\n{'=' * 60}")
print(f" ✅ Alle Shops deaktiviert")
print(f"{'=' * 60}")
def get_shop_log_stats(shop):
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
log_file = os.path.join(httpdocs, LOG_FILE)
php_blocks = 0
ips = {}
bots = {} # Track bot statistics
shop_mode = get_shop_mode(shop)
is_bot_mode = is_bot_only_mode(shop_mode)
if os.path.isfile(log_file):
with open(log_file, 'r') as f:
for line in f:
php_blocks += 1
ip, ua = None, 'Unknown'
detected_bot = None
# Parse bot-only format: BOT: botname | IP: ...
if 'BOT: ' in line:
try:
detected_bot = line.split('BOT: ')[1].split(' |')[0].strip()
except:
pass
if 'IP: ' in line:
try:
ip = line.split('IP: ')[1].split(' |')[0].strip()
except:
pass
if 'UA: ' in line:
try:
ua = line.split('UA: ')[1].split(' |')[0].strip()
except:
pass
if ip:
if ip not in ips:
ips[ip] = {'count': 0, 'ua': ua}
ips[ip]['count'] += 1
if ua != 'Unknown' and ips[ip]['ua'] == 'Unknown':
ips[ip]['ua'] = ua
# Track bot statistics
if detected_bot:
bots[detected_bot] = bots.get(detected_bot, 0) + 1
elif ua and ua != 'Unknown':
bot_name = detect_bot(ua)
if bot_name != 'Unbekannt':
bots[bot_name] = bots.get(bot_name, 0) + 1
return php_blocks, ips, bots, get_shop_activation_time(shop)
def show_logs(shop):
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
log_file = os.path.join(httpdocs, LOG_FILE)
shop_mode = get_shop_mode(shop)
shop_geo = get_shop_geo_region(shop)
region_info = get_geo_region_info(shop_geo)
is_bot_mode = is_bot_only_mode(shop_mode)
blocks, ips, bots, activation_time = get_shop_log_stats(shop)
if activation_time:
runtime = (datetime.now() - activation_time).total_seconds() / 60
req_min = blocks / runtime if runtime > 0 else 0
else:
runtime, req_min = 0, 0
print(f"\n{'' * 70}")
print(f"📊 {shop} | {region_info['icon']} {region_info['name']} {get_mode_icon(shop_mode)}")
print(f"{'' * 70}")
print(f"⏱️ Laufzeit: {format_duration(runtime)}")
print(f"📈 Blocks: {blocks} ({req_min:.1f} req/min)")
if not is_bot_mode:
valid, count, err = validate_existing_cache(httpdocs, shop_geo)
print(f"✅ Cache: {count:,} Ranges" if valid else f"⚠️ Cache: {err}")
else:
print(f"🤖 Bot-Patterns: {len(BOT_PATTERNS)} aktiv")
# Show bot statistics for bot-only mode
if bots:
print(f"\n🤖 Bot-Statistik:")
for bot_name, count in sorted(bots.items(), key=lambda x: x[1], reverse=True)[:10]:
bar = "" * min(count // 5, 20) if count > 0 else ""
print(f" {bot_name}: {count}x {bar}")
if os.path.isfile(log_file):
print(f"\n📝 Letzte 30 Blocks:")
with open(log_file, 'r') as f:
for line in f.readlines()[-30:]:
print(line.rstrip())
if ips:
print(f"\n🔥 Top 10 IPs:")
for ip, data in sorted(ips.items(), key=lambda x: x[1]['count'], reverse=True)[:10]:
bot = detect_bot(data['ua'])
print(f" {ip} ({bot}): {data['count']}x")
def show_all_logs():
"""Show combined logs for all active shops"""
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine aktiven Shops")
return
print(f"\n{'' * 70}")
print(" 📊 GESAMTÜBERSICHT ALLER SHOPS")
print(f"{'' * 70}")
print(f" {COLOR_GREEN}Grün = hinter Link11{COLOR_RESET} | {COLOR_RED}Rot = Direkt{COLOR_RESET}")
total_php_blocks = 0
shop_php_stats = {} # shop -> {'blocks': N, 'runtime_minutes': float, 'req_min': float, 'ips': {}, 'bots': {}}
all_ips = {} # ip -> {'count': N, 'ua': user_agent, 'shops': {shop: count}}
all_bots = {} # bot_name -> count
total_minutes = 0
# Collect PHP stats
for shop in active_shops:
blocks, ips, bots, activation_time = get_shop_log_stats(shop)
total_php_blocks += blocks
# Calculate runtime and req/min
if activation_time:
runtime_minutes = (datetime.now() - activation_time).total_seconds() / 60
req_min = blocks / runtime_minutes if runtime_minutes > 0 else 0
else:
runtime_minutes = 0
req_min = 0
shop_php_stats[shop] = {
'blocks': blocks,
'runtime_minutes': runtime_minutes,
'req_min': req_min,
'ips': ips,
'bots': bots
}
if runtime_minutes > total_minutes:
total_minutes = runtime_minutes
for ip, data in ips.items():
if ip not in all_ips:
all_ips[ip] = {'count': 0, 'ua': data['ua'], 'shops': {}}
all_ips[ip]['count'] += data['count']
all_ips[ip]['shops'][shop] = data['count']
if data['ua'] != 'Unknown' and all_ips[ip]['ua'] == 'Unknown':
all_ips[ip]['ua'] = data['ua']
for bot_name, count in bots.items():
all_bots[bot_name] = all_bots.get(bot_name, 0) + count
# Calculate total req/min
total_req_min = total_php_blocks / total_minutes if total_minutes > 0 else 0
# Get CrowdSec stats
crowdsec_stats = {}
if check_crowdsec():
code, stdout, _ = run_command("cscli decisions list -o raw --limit 0")
if code == 0 and stdout:
for line in stdout.strip().split('\n')[1:]:
for shop in active_shops:
if shop in line:
crowdsec_stats[shop] = crowdsec_stats.get(shop, 0) + 1
break
total_crowdsec = sum(crowdsec_stats.values())
# Display PHP blocks with req/min and top IP per shop
print(f"\n📝 Blocks gesamt: {total_php_blocks} (⌀ {total_req_min:.1f} req/min, Laufzeit: {format_duration(total_minutes)})")
if shop_php_stats:
for shop in sorted(shop_php_stats.keys()):
stats = shop_php_stats[shop]
count = stats['blocks']
req_min = stats['req_min']
runtime = stats['runtime_minutes']
bar = "" * min(int(req_min * 2), 20) if req_min > 0 else ""
runtime_str = format_duration(runtime) if runtime > 0 else "?"
# Get geo region icon and mode icon
geo_region = get_shop_geo_region(shop)
region_info = get_geo_region_info(geo_region)
geo_icon = region_info['icon']
mode_icon = get_mode_icon(get_shop_mode(shop))
# Color shop name based on Link11 status
link11_info = check_link11(shop)
if link11_info['is_link11']:
shop_colored = f"{COLOR_GREEN}{shop}{COLOR_RESET}"
else:
shop_colored = f"{COLOR_RED}{shop}{COLOR_RESET}"
print(f" ├─ {shop_colored} {geo_icon} {mode_icon}: {count} ({req_min:.1f} req/min, seit {runtime_str}) {bar}")
# Show top IP for this shop
shop_ips = stats['ips']
if shop_ips and count > 0:
top_ip = max(shop_ips.items(), key=lambda x: x[1]['count'])
top_ip_addr = top_ip[0]
top_ip_count = top_ip[1]['count']
top_ip_ua = top_ip[1]['ua']
top_ip_bot = detect_bot(top_ip_ua)
top_ip_req_min = top_ip_count / runtime if runtime > 0 else 0
if top_ip_bot == 'Unbekannt':
display_name = (top_ip_ua[:40] + '...') if len(top_ip_ua) > 43 else top_ip_ua
else:
display_name = top_ip_bot
print(f" │ └─➤ Top: {top_ip_addr} ({display_name}) - {top_ip_count}x, {top_ip_req_min:.1f} req/min")
# Display CrowdSec bans
print(f"\n🛡️ CrowdSec-Bans gesamt: {total_crowdsec}")
if crowdsec_stats:
for shop in sorted(crowdsec_stats.keys()):
count = crowdsec_stats[shop]
bar = "" * min(count // 10, 20) if count > 0 else ""
geo_region = get_shop_geo_region(shop)
region_info = get_geo_region_info(geo_region)
geo_icon = region_info['icon']
mode_icon = get_mode_icon(get_shop_mode(shop))
link11_info = check_link11(shop)
if link11_info['is_link11']:
shop_colored = f"{COLOR_GREEN}{shop}{COLOR_RESET}"
else:
shop_colored = f"{COLOR_RED}{shop}{COLOR_RESET}"
print(f" ├─ {shop_colored} {geo_icon} {mode_icon}: {count} {bar}")
elif check_crowdsec():
print(" └─ Keine aktiven Bans")
else:
print(" └─ CrowdSec nicht verfügbar")
# Display bot statistics
if all_bots:
print(f"\n🤖 Bot-Statistik (alle Shops):")
for bot_name, count in sorted(all_bots.items(), key=lambda x: x[1], reverse=True)[:15]:
bar = "" * min(count // 5, 20) if count > 0 else ""
print(f" {bot_name}: {count}x {bar}")
# Top 50 blocked IPs with bot detection, req/min, and top shop
if all_ips:
print(f"\n🔥 Top 50 blockierte IPs (alle Shops):")
sorted_ips = sorted(all_ips.items(), key=lambda x: x[1]['count'], reverse=True)[:50]
for ip, data in sorted_ips:
count = data['count']
ua = data['ua']
bot_name = detect_bot(ua)
shops_data = data['shops']
# Calculate req/min for this IP
ip_req_min = count / total_minutes if total_minutes > 0 else 0
# Find top shop for this IP
if shops_data:
top_shop = max(shops_data.items(), key=lambda x: x[1])
top_shop_name = top_shop[0]
top_shop_count = top_shop[1]
if len(top_shop_name) > 25:
top_shop_short = top_shop_name[:22] + '...'
else:
top_shop_short = top_shop_name
link11_info = check_link11(top_shop_name)
if link11_info['is_link11']:
top_shop_display = f"{COLOR_GREEN}{top_shop_short}{COLOR_RESET}"
else:
top_shop_display = f"{COLOR_RED}{top_shop_short}{COLOR_RESET}"
else:
top_shop_display = "?"
top_shop_count = 0
if bot_name == 'Unbekannt':
display_name = (ua[:35] + '...') if len(ua) > 38 else ua
if display_name == 'Unknown':
display_name = 'Unbekannt'
else:
display_name = bot_name
bar = "" * min(count // 5, 20) if count > 0 else ""
print(f" {ip} ({display_name}): {count} ({ip_req_min:.1f} req/min) → {top_shop_display} [{top_shop_count}x] {bar}")
print(f"\n{'' * 70}")
input("\nDrücke Enter um fortzufahren...")
def main():
print("\n" + "=" * 60)
print(" GeoIP Shop Blocker Manager v3.3.0")
print(" 🇩🇪🇦🇹🇨🇭 DACH | 🇪🇺 Eurozone+GB | 🤖 Bot-Only")
print(" 🛡️ Mit Cache-Validierung und Fail-Open")
print("=" * 60)
print(f" {'' if check_crowdsec() else '⚠️ '} CrowdSec")
code, stdout, _ = run_command("systemctl is-active geoip-crowdsec-watcher.service")
print(f" {'' if code == 0 and stdout.strip() == 'active' else '⚠️ '} Watcher-Service")
while True:
print("\n" + "-" * 40)
print("[1] Aktivieren (einzeln)")
print("[2] Deaktivieren (einzeln)")
print("[3] Logs anzeigen")
print("[4] Status")
print("-" * 40)
print("[5] 🚀 ALLE aktivieren")
print("[6] 🛑 ALLE deaktivieren")
print(f"[7] {COLOR_RED}🎯 Nur DIREKTE aktivieren (ohne Link11){COLOR_RESET}")
print("-" * 40)
print("[0] Beenden")
choice = input("\nWähle: ").strip()
if choice == "1":
available = [s for s in get_available_shops() if s not in get_active_shops()]
if not available:
print("\n⚠️ Keine Shops verfügbar")
continue
print("\n📋 Verfügbare Shops:")
for i, shop in enumerate(available, 1):
print(format_shop_with_link11(shop, show_index=i))
try:
idx = int(input("\nShop wählen: ").strip()) - 1
if 0 <= idx < len(available):
mode = select_mode()
is_bot_mode = is_bot_only_mode(mode)
if is_bot_mode:
geo = "none"
region_info = get_geo_region_info("none")
else:
geo = select_geo_region()
region_info = get_geo_region_info(geo)
confirm_msg = f"\n{region_info['icon']} {get_mode_description(mode)} aktivieren für '{available[idx]}'? (ja/nein): "
if input(confirm_msg).lower() in ['ja', 'j']:
activate_blocking(available[idx], mode=mode, geo_region=geo)
except:
print("❌ Ungültig")
elif choice == "2":
active = get_active_shops()
if not active:
print("\n⚠️ Keine aktiven Shops")
continue
print("\n📋 Aktive Shops:")
for i, shop in enumerate(active, 1):
region_info = get_geo_region_info(get_shop_geo_region(shop))
mode_icon = get_mode_icon(get_shop_mode(shop))
print(f" [{i}] {format_shop_with_link11(shop)} {region_info['icon']} {mode_icon}")
try:
idx = int(input("\nShop wählen: ").strip()) - 1
if 0 <= idx < len(active):
if input(f"\nDeaktivieren für '{active[idx]}'? (ja/nein): ").lower() in ['ja', 'j']:
deactivate_blocking(active[idx])
except:
print("❌ Ungültig")
elif choice == "3":
active = get_active_shops()
if not active:
print("\n⚠️ Keine aktiven Shops")
continue
print("\n📋 Logs für:")
print(" [0] 📊 ALLE")
for i, shop in enumerate(active, 1):
region_info = get_geo_region_info(get_shop_geo_region(shop))
mode_icon = get_mode_icon(get_shop_mode(shop))
print(f" [{i}] {format_shop_with_link11(shop)} {region_info['icon']} {mode_icon}")
try:
idx = int(input("\nWähle: ").strip())
if idx == 0:
show_all_logs()
elif 1 <= idx <= len(active):
show_logs(active[idx - 1])
except:
print("❌ Ungültig")
elif choice == "4":
shops = get_available_shops()
active = get_active_shops()
print(f"\n📊 {len(active)}/{len(shops)} Shops aktiv")
for shop in active:
region_info = get_geo_region_info(get_shop_geo_region(shop))
shop_mode = get_shop_mode(shop)
mode_icon = get_mode_icon(shop_mode)
is_bot_mode = is_bot_only_mode(shop_mode)
blocks, _, bots, activation_time = get_shop_log_stats(shop)
runtime = (datetime.now() - activation_time).total_seconds() / 60 if activation_time else 0
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
print(f" {format_shop_with_link11(shop)} {region_info['icon']} {mode_icon}")
if is_bot_mode:
print(f" {blocks} blocks, {format_duration(runtime)}, {len(BOT_PATTERNS)} Bot-Patterns")
else:
valid, count, _ = validate_existing_cache(httpdocs, get_shop_geo_region(shop))
cache_str = f"{count:,}" if valid else "⚠️"
print(f" {blocks} blocks, {format_duration(runtime)}, Cache: {cache_str}")
elif choice == "5":
activate_all_shops()
elif choice == "6":
deactivate_all_shops()
elif choice == "7":
activate_direct_shops_only()
elif choice == "0":
print("\n👋 Auf Wiedersehen!")
break
if __name__ == "__main__":
if os.geteuid() != 0:
print("❌ Als root ausführen!")
sys.exit(1)
try:
main()
except KeyboardInterrupt:
print("\n\n👋 Abgebrochen")
sys.exit(0)