Files
geoip_shop_manager/geoip_shop_manager.py

1193 lines
43 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
GeoIP Shop Blocker Manager - DACH & Eurozone Version
2-Component System: PHP blocking + Python watcher (systemd service)
Supports two geo regions:
- DACH: Germany, Austria, Switzerland (3 countries)
- Eurozone+GB: All Eurozone countries + GB + CH (22 countries)
v3.1.0: Cache validation on activation + fail-open in PHP
"""
import os
import sys
import shutil
import subprocess
import json
import time
import re
import socket
from datetime import datetime, timedelta
from pathlib import Path
# ANSI Color Codes
COLOR_GREEN = "\033[92m"
COLOR_RED = "\033[91m"
COLOR_YELLOW = "\033[93m"
COLOR_BLUE = "\033[94m"
COLOR_RESET = "\033[0m"
COLOR_BOLD = "\033[1m"
# Link11 IP
LINK11_IP = "128.65.223.106"
# Cache for DNS lookups (to avoid repeated lookups)
DNS_CACHE = {}
# Configuration
VHOSTS_DIR = "/var/www/vhosts"
BACKUP_SUFFIX = ".geoip_backup"
BLOCKING_FILE = "geoip_blocking.php"
CACHE_FILE = "geoip_ip_ranges.cache"
LOG_FILE = "geoip_blocked.log"
CROWDSEC_QUEUE_FILE = "geoip_crowdsec_queue.log"
WATCHER_SCRIPT = "/usr/local/bin/geoip_crowdsec_watcher.py"
SYSTEMD_SERVICE = "/etc/systemd/system/geoip-crowdsec-watcher.service"
ACTIVE_SHOPS_FILE = "/var/lib/crowdsec/geoip_active_shops.json"
# Minimum expected IP ranges per region (for validation)
MIN_RANGES = {
"dach": 1000, # DE+AT+CH should have at least 1000 ranges
"eurozone": 5000 # 22 countries should have at least 5000 ranges
}
# =============================================================================
# GEO REGIONS
# =============================================================================
GEO_REGIONS = {
"dach": {
"name": "DACH",
"countries": ["de", "at", "ch"],
"description": "Deutschland, Österreich, Schweiz",
"icon": "🇩🇪🇦🇹🇨🇭",
"short": "DACH"
},
"eurozone": {
"name": "Eurozone + GB",
"countries": [
"de", "at", "ch", "be", "cy", "ee", "es", "fi", "fr", "gb",
"gr", "hr", "ie", "it", "lt", "lu", "lv", "mt", "nl", "pt", "si", "sk"
],
"description": "22 Länder: DE, AT, CH, BE, CY, EE, ES, FI, FR, GB, GR, HR, IE, IT, LT, LU, LV, MT, NL, PT, SI, SK",
"icon": "🇪🇺",
"short": "EU+"
}
}
# =============================================================================
# BOT DETECTION
# =============================================================================
BOT_PATTERNS = {
'GPTBot': r'GPTBot', 'OAI-SearchBot': r'OAI-SearchBot', 'ChatGPT-User': r'ChatGPT-User',
'ClaudeBot': r'ClaudeBot', 'Claude-User': r'Claude-User', 'anthropic-ai': r'anthropic-ai',
'Googlebot': r'Googlebot', 'Google-Extended': r'Google-Extended', 'AdsBot-Google': r'AdsBot-Google',
'Bingbot': r'[Bb]ingbot', 'BingPreview': r'BingPreview', 'msnbot': r'msnbot',
'PerplexityBot': r'PerplexityBot', 'Applebot': r'Applebot', 'Amazonbot': r'Amazonbot',
'FacebookBot': r'facebookexternalhit|FacebookBot', 'Bytespider': r'Bytespider',
'DuckDuckBot': r'DuckDuckBot', 'YandexBot': r'YandexBot', 'Baiduspider': r'Baiduspider',
'AhrefsBot': r'AhrefsBot', 'SemrushBot': r'SemrushBot', 'MJ12bot': r'MJ12bot',
'DotBot': r'DotBot', 'PetalBot': r'PetalBot', 'DataForSeoBot': r'DataForSeoBot',
'LinkedInBot': r'LinkedInBot', 'Twitterbot': r'Twitterbot', 'Slackbot': r'Slackbot',
'UptimeRobot': r'UptimeRobot', 'Pingdom': r'Pingdom', 'curl': r'^curl/',
'python-requests': r'python-requests', 'Wget': r'Wget', 'Scrapy': r'Scrapy',
}
def detect_bot(user_agent):
if not user_agent or user_agent == 'Unknown':
return 'Unbekannt'
for bot_name, pattern in BOT_PATTERNS.items():
if re.search(pattern, user_agent, re.IGNORECASE):
return bot_name
return 'Unbekannt'
def check_link11(domain):
global DNS_CACHE
if domain in DNS_CACHE:
return DNS_CACHE[domain]
try:
ip = socket.gethostbyname(domain)
is_link11 = (ip == LINK11_IP)
DNS_CACHE[domain] = {'is_link11': is_link11, 'ip': ip}
return DNS_CACHE[domain]
except socket.gaierror:
DNS_CACHE[domain] = {'is_link11': False, 'ip': 'N/A'}
return DNS_CACHE[domain]
def format_shop_with_link11(shop, prefix="", show_index=None):
link11_info = check_link11(shop)
if link11_info['is_link11']:
color, suffix = COLOR_GREEN, " [Link11]"
else:
color, suffix = COLOR_RED, " [Direkt]"
if show_index is not None:
return f" [{show_index}] {color}{shop}{suffix}{COLOR_RESET}"
return f"{prefix}{color}{shop}{suffix}{COLOR_RESET}"
def get_geo_region_info(geo_region):
return GEO_REGIONS.get(geo_region, GEO_REGIONS["dach"])
def generate_php_countries_array(geo_region):
region_info = get_geo_region_info(geo_region)
return ", ".join([f"'{c}'" for c in region_info["countries"]])
# =============================================================================
# CACHE VALIDATION
# =============================================================================
def generate_and_validate_cache(httpdocs_path, geo_region):
"""Generate the IP ranges cache and validate it. Returns (success, range_count, error_message)"""
cache_file = os.path.join(httpdocs_path, CACHE_FILE)
region_info = get_geo_region_info(geo_region)
countries = region_info["countries"]
min_expected = MIN_RANGES.get(geo_region, 1000)
php_script = f'''<?php
$countries = {json.dumps(countries)};
$ranges = [];
foreach ($countries as $country) {{
$url = "https://www.ipdeny.com/ipblocks/data/aggregated/$country-aggregated.zone";
$ctx = stream_context_create(['http' => ['timeout' => 30]]);
$content = @file_get_contents($url, false, $ctx);
if ($content !== false) {{
$lines = explode("\\n", trim($content));
foreach ($lines as $line) {{
$line = trim($line);
if (!empty($line) && strpos($line, '/') !== false) {{
$ranges[] = $line;
}}
}}
}}
}}
if (count($ranges) >= {min_expected}) {{
file_put_contents("{cache_file}", serialize($ranges));
echo "OK:" . count($ranges);
}} else {{
echo "FAIL:" . count($ranges);
}}
'''
temp_php = os.path.join(httpdocs_path, '_geoip_cache_gen.php')
try:
with open(temp_php, 'w') as f:
f.write(php_script)
result = subprocess.run(['php', temp_php], capture_output=True, text=True, timeout=120)
output = result.stdout.strip()
if output.startswith('OK:'):
return True, int(output.split(':')[1]), None
elif output.startswith('FAIL:'):
return False, int(output.split(':')[1]), f"Nur {output.split(':')[1]} Ranges (min. {min_expected} erwartet)"
return False, 0, f"Unerwartete Ausgabe: {output}"
except subprocess.TimeoutExpired:
return False, 0, "Timeout beim Laden der IP-Ranges"
except Exception as e:
return False, 0, str(e)
finally:
if os.path.exists(temp_php):
os.remove(temp_php)
def validate_existing_cache(httpdocs_path, geo_region):
"""Validate existing cache. Returns (valid, range_count, error_message)"""
cache_file = os.path.join(httpdocs_path, CACHE_FILE)
min_expected = MIN_RANGES.get(geo_region, 1000)
if not os.path.exists(cache_file):
return False, 0, "Cache-Datei existiert nicht"
php_script = f'''<?php
$data = @unserialize(@file_get_contents("{cache_file}"));
if ($data === false || !is_array($data)) {{ echo "CORRUPT:0"; }}
else {{ echo "OK:" . count($data); }}
'''
try:
result = subprocess.run(['php', '-r', php_script], capture_output=True, text=True, timeout=10)
output = result.stdout.strip()
if output.startswith('OK:'):
count = int(output.split(':')[1])
if count >= min_expected:
return True, count, None
return False, count, f"Nur {count} Ranges (min. {min_expected} erwartet)"
return False, 0, "Cache-Datei ist korrupt"
except Exception as e:
return False, 0, str(e)
# =============================================================================
# PHP TEMPLATES WITH FAIL-OPEN
# =============================================================================
GEOIP_SCRIPT_TEMPLATE = '''<?php
/**
* GeoIP Blocking Script - {region_name}
* Valid until: {expiry_date}
* FAIL-OPEN: If cache is corrupt/empty, traffic is allowed through
*/
$expiry_date = strtotime('{expiry_timestamp}');
if (time() > $expiry_date) return;
$visitor_ip = $_SERVER['REMOTE_ADDR'] ?? '';
if (empty($visitor_ip)) return;
if (filter_var($visitor_ip, FILTER_VALIDATE_IP, FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE) === false) return;
$cache_file = __DIR__ . '/{cache_file}';
$cache_duration = 86400;
$log_file = __DIR__ . '/{log_file}';
$crowdsec_queue = __DIR__ . '/{crowdsec_queue}';
$min_ranges = {min_ranges};
$allowed_countries = [{countries_array}];
function download_allowed_ranges($countries) {{
$ranges = [];
foreach ($countries as $country) {{
$url = "https://www.ipdeny.com/ipblocks/data/aggregated/$country-aggregated.zone";
$ctx = stream_context_create(['http' => ['timeout' => 30]]);
$content = @file_get_contents($url, false, $ctx);
if ($content !== false) {{
foreach (explode("\\n", trim($content)) as $line) {{
$line = trim($line);
if (!empty($line) && strpos($line, '/') !== false) $ranges[] = $line;
}}
}}
}}
return $ranges;
}}
function ip_in_range($ip, $cidr) {{
list($subnet, $mask) = explode('/', $cidr);
$mask_long = -1 << (32 - (int)$mask);
return (ip2long($ip) & $mask_long) == (ip2long($subnet) & $mask_long);
}}
$allowed_ranges = [];
$cache_valid = false;
if (file_exists($cache_file) && (time() - filemtime($cache_file)) < $cache_duration) {{
$cached_data = @file_get_contents($cache_file);
if ($cached_data !== false) {{
$allowed_ranges = @unserialize($cached_data);
if (is_array($allowed_ranges) && count($allowed_ranges) >= $min_ranges) {{
$cache_valid = true;
}} else {{
@unlink($cache_file);
$allowed_ranges = [];
}}
}}
}}
if (!$cache_valid) {{
$allowed_ranges = download_allowed_ranges($allowed_countries);
if (is_array($allowed_ranges) && count($allowed_ranges) >= $min_ranges) {{
@file_put_contents($cache_file, serialize($allowed_ranges));
$cache_valid = true;
}} else {{
error_log("GeoIP FAIL-OPEN: Could not load valid IP ranges (got " . count($allowed_ranges) . ", need $min_ranges)");
return;
}}
}}
$is_allowed = false;
foreach ($allowed_ranges as $range) {{
if (ip_in_range($visitor_ip, $range)) {{ $is_allowed = true; break; }}
}}
if (!$is_allowed) {{
$timestamp = date('Y-m-d H:i:s');
$ua = $_SERVER['HTTP_USER_AGENT'] ?? 'Unknown';
$uri = $_SERVER['REQUEST_URI'] ?? '/';
@file_put_contents($log_file, "[$timestamp] IP: $visitor_ip | UA: $ua | URI: $uri\\n", FILE_APPEND | LOCK_EX);
@file_put_contents($crowdsec_queue, "$timestamp|$visitor_ip|{shop_name}\\n", FILE_APPEND | LOCK_EX);
header('HTTP/1.1 403 Forbidden');
exit;
}}
'''
GEOIP_SCRIPT_TEMPLATE_PHP_ONLY = '''<?php
/**
* GeoIP Blocking Script (PHP-only) - {region_name}
* Valid until: {expiry_date}
* FAIL-OPEN: If cache is corrupt/empty, traffic is allowed through
*/
$expiry_date = strtotime('{expiry_timestamp}');
if (time() > $expiry_date) return;
$visitor_ip = $_SERVER['REMOTE_ADDR'] ?? '';
if (empty($visitor_ip)) return;
if (filter_var($visitor_ip, FILTER_VALIDATE_IP, FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE) === false) return;
$cache_file = __DIR__ . '/{cache_file}';
$cache_duration = 86400;
$log_file = __DIR__ . '/{log_file}';
$min_ranges = {min_ranges};
$allowed_countries = [{countries_array}];
function download_allowed_ranges($countries) {{
$ranges = [];
foreach ($countries as $country) {{
$url = "https://www.ipdeny.com/ipblocks/data/aggregated/$country-aggregated.zone";
$ctx = stream_context_create(['http' => ['timeout' => 30]]);
$content = @file_get_contents($url, false, $ctx);
if ($content !== false) {{
foreach (explode("\\n", trim($content)) as $line) {{
$line = trim($line);
if (!empty($line) && strpos($line, '/') !== false) $ranges[] = $line;
}}
}}
}}
return $ranges;
}}
function ip_in_range($ip, $cidr) {{
list($subnet, $mask) = explode('/', $cidr);
$mask_long = -1 << (32 - (int)$mask);
return (ip2long($ip) & $mask_long) == (ip2long($subnet) & $mask_long);
}}
$allowed_ranges = [];
$cache_valid = false;
if (file_exists($cache_file) && (time() - filemtime($cache_file)) < $cache_duration) {{
$cached_data = @file_get_contents($cache_file);
if ($cached_data !== false) {{
$allowed_ranges = @unserialize($cached_data);
if (is_array($allowed_ranges) && count($allowed_ranges) >= $min_ranges) {{
$cache_valid = true;
}} else {{
@unlink($cache_file);
$allowed_ranges = [];
}}
}}
}}
if (!$cache_valid) {{
$allowed_ranges = download_allowed_ranges($allowed_countries);
if (is_array($allowed_ranges) && count($allowed_ranges) >= $min_ranges) {{
@file_put_contents($cache_file, serialize($allowed_ranges));
$cache_valid = true;
}} else {{
error_log("GeoIP FAIL-OPEN: Could not load valid IP ranges (got " . count($allowed_ranges) . ", need $min_ranges)");
return;
}}
}}
$is_allowed = false;
foreach ($allowed_ranges as $range) {{
if (ip_in_range($visitor_ip, $range)) {{ $is_allowed = true; break; }}
}}
if (!$is_allowed) {{
$timestamp = date('Y-m-d H:i:s');
$ua = $_SERVER['HTTP_USER_AGENT'] ?? 'Unknown';
$uri = $_SERVER['REQUEST_URI'] ?? '/';
@file_put_contents($log_file, "[$timestamp] IP: $visitor_ip | UA: $ua | URI: $uri\\n", FILE_APPEND | LOCK_EX);
header('HTTP/1.1 403 Forbidden');
exit;
}}
'''
# =============================================================================
# WATCHER SERVICE
# =============================================================================
WATCHER_SCRIPT_CONTENT = '''#!/usr/bin/env python3
import os, sys, time, subprocess, json
from datetime import datetime
VHOSTS_DIR = "/var/www/vhosts"
QUEUE_FILE = "geoip_crowdsec_queue.log"
ACTIVE_SHOPS_FILE = "/var/lib/crowdsec/geoip_active_shops.json"
PROCESSED_IPS = {}
CHECK_INTERVAL = 5
def log(msg): print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}", flush=True)
def get_active_shops():
if not os.path.isfile(ACTIVE_SHOPS_FILE): return {}
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f: return json.load(f)
except: return {}
def add_to_crowdsec(ip, shop):
now = time.time()
if ip in PROCESSED_IPS and (now - PROCESSED_IPS[ip]) < 3600: return True
try:
result = subprocess.run(['cscli', 'decisions', 'add', '--ip', ip, '--duration', '72h', '--type', 'ban', '--reason', f'GeoIP: blocked by {shop}'], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
PROCESSED_IPS[ip] = now
log(f"✅ Added {ip} to CrowdSec (from {shop})")
return True
except: pass
return False
def process_queue_file(shop_path, shop):
queue_file = os.path.join(shop_path, 'httpdocs', QUEUE_FILE)
if not os.path.isfile(queue_file): return 0
processed = 0
try:
with open(queue_file, 'r') as f: lines = f.readlines()
if not lines: return 0
for line in lines:
parts = line.strip().split('|')
if len(parts) >= 2 and add_to_crowdsec(parts[1], shop): processed += 1
if processed > 0:
with open(queue_file, 'w') as f: f.write('')
except: pass
return processed
def main():
log("🚀 GeoIP CrowdSec Watcher started")
while True:
try:
active_shops = get_active_shops()
for shop in active_shops:
shop_path = os.path.join(VHOSTS_DIR, shop)
if os.path.isdir(shop_path): process_queue_file(shop_path, shop)
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt: break
except: time.sleep(CHECK_INTERVAL)
if __name__ == "__main__": main()
'''
SYSTEMD_SERVICE_CONTENT = '''[Unit]
Description=GeoIP CrowdSec Watcher Service
After=network.target crowdsec.service
[Service]
Type=simple
ExecStart=/usr/bin/python3 /usr/local/bin/geoip_crowdsec_watcher.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
'''
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def run_command(cmd, capture_output=True):
try:
if capture_output:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
return result.returncode, result.stdout, result.stderr
return subprocess.run(cmd, shell=True, timeout=30).returncode, "", ""
except Exception as e:
return -1, "", str(e)
def check_crowdsec():
code, stdout, _ = run_command("systemctl is-active crowdsec")
return code == 0 and stdout.strip() == "active"
def install_watcher_service():
print(" 📦 Installiere CrowdSec-Watcher-Service...")
with open(WATCHER_SCRIPT, 'w') as f:
f.write(WATCHER_SCRIPT_CONTENT)
os.chmod(WATCHER_SCRIPT, 0o755)
with open(SYSTEMD_SERVICE, 'w') as f:
f.write(SYSTEMD_SERVICE_CONTENT)
run_command("systemctl daemon-reload")
run_command("systemctl enable geoip-crowdsec-watcher.service")
run_command("systemctl start geoip-crowdsec-watcher.service")
time.sleep(1)
code, stdout, _ = run_command("systemctl is-active geoip-crowdsec-watcher.service")
if code == 0 and stdout.strip() == "active":
print(" ✅ Service gestartet")
return True
print(" ⚠️ Service konnte nicht gestartet werden")
return False
def uninstall_watcher_service():
print(" 📦 Deinstalliere CrowdSec-Watcher-Service...")
run_command("systemctl stop geoip-crowdsec-watcher.service")
run_command("systemctl disable geoip-crowdsec-watcher.service")
for f in [SYSTEMD_SERVICE, WATCHER_SCRIPT]:
if os.path.isfile(f):
os.remove(f)
run_command("systemctl daemon-reload")
print(" ✅ Service deinstalliert")
def add_shop_to_active(shop, mode="php+crowdsec", geo_region="dach"):
os.makedirs(os.path.dirname(ACTIVE_SHOPS_FILE), exist_ok=True)
shops = {}
if os.path.isfile(ACTIVE_SHOPS_FILE):
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
shops[shop] = {
"activated": datetime.now().isoformat(),
"expiry": (datetime.now() + timedelta(hours=72)).isoformat(),
"mode": mode,
"geo_region": geo_region
}
with open(ACTIVE_SHOPS_FILE, 'w') as f:
json.dump(shops, f, indent=2)
def get_shop_mode(shop):
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return "php+crowdsec"
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
return json.load(f).get(shop, {}).get("mode", "php+crowdsec")
except:
return "php+crowdsec"
def get_shop_geo_region(shop):
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return "dach"
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
return json.load(f).get(shop, {}).get("geo_region", "dach")
except:
return "dach"
def get_shop_activation_time(shop):
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return None
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
activated_str = json.load(f).get(shop, {}).get("activated")
return datetime.fromisoformat(activated_str) if activated_str else None
except:
return None
def format_duration(minutes):
if minutes < 60:
return f"{int(minutes)}m"
hours = minutes / 60
if hours < 24:
return f"{int(hours)}h {int(minutes % 60)}m"
return f"{int(hours / 24)}d {int(hours % 24)}h"
def remove_shop_from_active(shop):
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
if shop in shops:
del shops[shop]
with open(ACTIVE_SHOPS_FILE, 'w') as f:
json.dump(shops, f, indent=2)
def cleanup_crowdsec_decisions(shop):
if not check_crowdsec():
return
print(f" 🔍 Entferne CrowdSec-Decisions für {shop}...")
total_removed = 0
for _ in range(50):
code, stdout, _ = run_command(f"cscli decisions list -o raw --limit 0 | grep '{shop}'")
if code != 0 or not stdout.strip():
break
batch_count = 0
for line in stdout.strip().split('\n')[:100]:
try:
parts = line.split(',')
if len(parts) >= 3:
ip_field = parts[2].strip()
ip = ip_field.split(':', 1)[1] if ':' in ip_field else ip_field
if ip and run_command(f"cscli decisions delete --ip {ip}")[0] == 0:
batch_count += 1
except:
continue
total_removed += batch_count
if batch_count == 0:
break
print(f"{total_removed} Decisions entfernt" if total_removed else " Keine Decisions gefunden")
def get_available_shops():
shops = []
if not os.path.exists(VHOSTS_DIR):
return shops
for entry in os.listdir(VHOSTS_DIR):
shop_path = os.path.join(VHOSTS_DIR, entry)
if os.path.isdir(shop_path) and entry not in ['chroot', 'system', 'default']:
httpdocs = os.path.join(shop_path, 'httpdocs')
if os.path.isdir(httpdocs) and os.path.isfile(os.path.join(httpdocs, 'index.php')):
shops.append(entry)
return sorted(shops)
def get_active_shops():
active = []
for shop in get_available_shops():
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
if os.path.isfile(os.path.join(httpdocs, BLOCKING_FILE)) or os.path.isfile(os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')):
active.append(shop)
return active
def select_geo_region():
print(f"\n🌍 Wähle die Geo-Region:")
print(f" [1] {GEO_REGIONS['dach']['icon']} DACH - {GEO_REGIONS['dach']['description']}")
print(f" [2] {GEO_REGIONS['eurozone']['icon']} Eurozone+GB - {len(GEO_REGIONS['eurozone']['countries'])} Länder")
return "eurozone" if input(f"\nRegion wählen [1/2]: ").strip() == "2" else "dach"
def select_mode():
print(f"\n🔧 Wähle den Blocking-Modus:")
print(f" [1] PHP + CrowdSec (IPs werden an CrowdSec gemeldet)")
print(f" [2] Nur PHP (keine CrowdSec-Synchronisation)")
return "php-only" if input(f"\nModus wählen [1/2]: ").strip() == "2" else "php+crowdsec"
# =============================================================================
# MAIN FUNCTIONS
# =============================================================================
def activate_blocking(shop, silent=False, mode="php+crowdsec", geo_region="dach"):
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
region_info = get_geo_region_info(geo_region)
min_ranges = MIN_RANGES.get(geo_region, 1000)
if os.path.isfile(backup_php):
if not silent:
print(f"⚠️ GeoIP-Blocking bereits aktiv für {shop}")
return False
if not os.path.isfile(index_php):
if not silent:
print(f"❌ index.php nicht gefunden")
return False
if not silent:
print(f"\n🔧 Aktiviere {region_info['icon']} {region_info['name']} GeoIP-Blocking für: {shop}")
print(f" Erlaubt: {region_info['description']}")
print(f" Modus: {'PHP + CrowdSec' if mode == 'php+crowdsec' else 'Nur PHP'}")
print("=" * 60)
# Step 1: Watcher service
if mode == "php+crowdsec":
crowdsec_shops = [s for s in get_active_shops() if get_shop_mode(s) == "php+crowdsec"]
if not crowdsec_shops and check_crowdsec():
if not silent:
print("\n[1/4] Installiere CrowdSec-Watcher-Service...")
install_watcher_service()
elif not silent:
print("\n[1/4] CrowdSec-Watcher-Service bereits aktiv")
elif not silent:
print("\n[1/4] CrowdSec-Synchronisation deaktiviert (nur PHP-Modus)")
# Step 2: PHP blocking
if not silent:
print("\n[2/4] Aktiviere PHP-Blocking...")
shutil.copy2(index_php, backup_php)
with open(index_php, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
insert_line = 0
for i, line in enumerate(lines):
if 'declare(strict_types' in line:
insert_line = i + 1
break
elif '<?php' in line and insert_line == 0:
insert_line = i + 1
require_statement = f"require_once __DIR__ . '/{BLOCKING_FILE}';"
if require_statement not in content:
lines.insert(insert_line, require_statement)
with open(index_php, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
expiry = datetime.now() + timedelta(hours=72)
countries_array = generate_php_countries_array(geo_region)
template = GEOIP_SCRIPT_TEMPLATE if mode == "php+crowdsec" else GEOIP_SCRIPT_TEMPLATE_PHP_ONLY
geoip_content = template.format(
region_name=region_info['name'],
region_description=region_info['description'],
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop,
countries_array=countries_array,
min_ranges=min_ranges
)
with open(blocking_file, 'w', encoding='utf-8') as f:
f.write(geoip_content)
if not silent:
print(" ✅ PHP-Blocking aktiviert")
# Step 3: Generate cache
if not silent:
print(f"\n[3/4] Generiere IP-Range-Cache ({len(region_info['countries'])} Länder)...")
success, range_count, error = generate_and_validate_cache(httpdocs, geo_region)
if success:
if not silent:
print(f" ✅ Cache generiert: {range_count:,} IP-Ranges")
else:
if not silent:
print(f" ⚠️ Cache-Generierung: {error}")
print(f" Fail-Open aktiv - Cache wird beim ersten Request neu generiert")
# Step 4: Register
if not silent:
print("\n[4/4] Registriere Shop...")
add_shop_to_active(shop, mode, geo_region)
if not silent:
print("\n" + "=" * 60)
print(f"{region_info['icon']} {region_info['name']} GeoIP-Blocking aktiviert")
print(f" Shop: {shop}")
print(f" IP-Ranges: {range_count:,}")
print(f" Gültig bis: {expiry.strftime('%Y-%m-%d %H:%M:%S CET')}")
print(f" 🛡️ Fail-Open: Bei Cache-Fehlern wird Traffic durchgelassen")
print("=" * 60)
return True
def deactivate_blocking(shop, silent=False):
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
shop_mode = get_shop_mode(shop)
shop_geo = get_shop_geo_region(shop)
region_info = get_geo_region_info(shop_geo)
if not silent:
print(f"\n🔧 Deaktiviere {region_info['icon']} {region_info['name']} für: {shop}")
print("=" * 60)
# Restore backup
if not silent:
print("\n[1/4] PHP-Blocking entfernen...")
if os.path.isfile(backup_php):
shutil.move(backup_php, index_php)
else:
if os.path.isfile(index_php):
with open(index_php, 'r') as f:
content = f.read()
lines = [l for l in content.split('\n') if BLOCKING_FILE not in l]
with open(index_php, 'w') as f:
f.write('\n'.join(lines))
for f in [os.path.join(httpdocs, x) for x in [BLOCKING_FILE, CACHE_FILE, LOG_FILE, CROWDSEC_QUEUE_FILE]]:
if os.path.isfile(f):
os.remove(f)
if not silent:
print("\n[2/4] Deregistriere Shop...")
remove_shop_from_active(shop)
if not silent:
print("\n[3/4] CrowdSec-Decisions entfernen...")
if shop_mode == "php+crowdsec" and check_crowdsec():
cleanup_crowdsec_decisions(shop)
elif not silent:
print(" Keine CrowdSec-Synchronisation aktiv")
if not silent:
print("\n[4/4] Prüfe Watcher-Service...")
crowdsec_shops = [s for s in get_active_shops() if get_shop_mode(s) == "php+crowdsec"]
if not crowdsec_shops:
uninstall_watcher_service()
elif not silent:
print(f" Service bleibt aktiv ({len(crowdsec_shops)} Shop(s))")
if not silent:
print("\n" + "=" * 60)
print(f"✅ GeoIP-Blocking deaktiviert für: {shop}")
print("=" * 60)
return True
def activate_all_shops():
available_shops = [s for s in get_available_shops() if s not in get_active_shops()]
if not available_shops:
print("\n⚠️ Keine Shops zum Aktivieren verfügbar")
return
print(f"\n{'=' * 60}")
print(f" GeoIP-Blocking für ALLE Shops aktivieren")
print(f"{'=' * 60}")
print(f"\n📋 {len(available_shops)} Shop(s):")
for shop in available_shops:
print(format_shop_with_link11(shop, prefix=""))
geo_region = select_geo_region()
region_info = get_geo_region_info(geo_region)
mode = select_mode()
print(f"\n⚠️ Region: {region_info['icon']} {region_info['name']}")
print(f" Modus: {'PHP + CrowdSec 🛡️' if mode == 'php+crowdsec' else 'Nur PHP 📝'}")
if input(f"\nFortfahren? (ja/nein): ").strip().lower() not in ['ja', 'j', 'yes', 'y']:
print("\n❌ Abgebrochen")
return
print(f"\n{'=' * 60}")
if mode == "php+crowdsec" and check_crowdsec():
if not [s for s in get_active_shops() if get_shop_mode(s) == "php+crowdsec"]:
print("\n📦 Installiere CrowdSec-Watcher-Service...")
install_watcher_service()
success_count = 0
for i, shop in enumerate(available_shops, 1):
print(f"\n[{i}/{len(available_shops)}] {shop}")
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
if os.path.isfile(backup_php) or not os.path.isfile(index_php):
print(f" ⚠️ Übersprungen")
continue
shutil.copy2(index_php, backup_php)
with open(index_php, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
insert_line = 0
for idx, line in enumerate(lines):
if 'declare(strict_types' in line:
insert_line = idx + 1
break
elif '<?php' in line and insert_line == 0:
insert_line = idx + 1
require_statement = f"require_once __DIR__ . '/{BLOCKING_FILE}';"
if require_statement not in content:
lines.insert(insert_line, require_statement)
with open(index_php, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
expiry = datetime.now() + timedelta(hours=72)
template = GEOIP_SCRIPT_TEMPLATE if mode == "php+crowdsec" else GEOIP_SCRIPT_TEMPLATE_PHP_ONLY
geoip_content = template.format(
region_name=region_info['name'],
region_description=region_info['description'],
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop,
countries_array=generate_php_countries_array(geo_region),
min_ranges=MIN_RANGES.get(geo_region, 1000)
)
with open(blocking_file, 'w', encoding='utf-8') as f:
f.write(geoip_content)
print(f" ⏳ Cache generieren...")
cache_ok, count, _ = generate_and_validate_cache(httpdocs, geo_region)
add_shop_to_active(shop, mode, geo_region)
print(f" ✅ Aktiviert ({count:,} Ranges)" if cache_ok else f" ⚠️ Aktiviert (Cache-Warnung)")
success_count += 1
print(f"\n{'=' * 60}")
print(f"{success_count} Shop(s) aktiviert")
print(f" 🛡️ Fail-Open bei Cache-Fehlern aktiv")
print(f"{'=' * 60}")
def deactivate_all_shops():
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine aktiven Shops")
return
print(f"\n{'=' * 60}")
print(f" GeoIP-Blocking für ALLE deaktivieren")
print(f"{'=' * 60}")
print(f"\n📋 {len(active_shops)} Shop(s):")
for shop in active_shops:
region_info = get_geo_region_info(get_shop_geo_region(shop))
print(f"{format_shop_with_link11(shop)} {region_info['icon']}")
if input(f"\nFortfahren? (ja/nein): ").strip().lower() not in ['ja', 'j', 'yes', 'y']:
print("\n❌ Abgebrochen")
return
for i, shop in enumerate(active_shops, 1):
print(f"\n[{i}/{len(active_shops)}] {shop}")
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
index_php = os.path.join(httpdocs, 'index.php')
if os.path.isfile(backup_php):
shutil.move(backup_php, index_php)
for f in [os.path.join(httpdocs, x) for x in [BLOCKING_FILE, CACHE_FILE, LOG_FILE, CROWDSEC_QUEUE_FILE]]:
if os.path.isfile(f):
os.remove(f)
remove_shop_from_active(shop)
if check_crowdsec():
cleanup_crowdsec_decisions(shop)
print(f" ✅ Deaktiviert")
uninstall_watcher_service()
print(f"\n{'=' * 60}")
print(f" ✅ Alle Shops deaktiviert")
print(f"{'=' * 60}")
def get_shop_log_stats(shop):
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
log_file = os.path.join(httpdocs, LOG_FILE)
php_blocks = 0
ips = {}
if os.path.isfile(log_file):
with open(log_file, 'r') as f:
for line in f:
php_blocks += 1
ip, ua = None, 'Unknown'
if 'IP: ' in line:
try:
ip = line.split('IP: ')[1].split(' |')[0].strip()
except:
pass
if 'UA: ' in line:
try:
ua = line.split('UA: ')[1].split(' |')[0].strip()
except:
pass
if ip:
if ip not in ips:
ips[ip] = {'count': 0, 'ua': ua}
ips[ip]['count'] += 1
if ua != 'Unknown' and ips[ip]['ua'] == 'Unknown':
ips[ip]['ua'] = ua
return php_blocks, ips, get_shop_activation_time(shop)
def show_logs(shop):
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
log_file = os.path.join(httpdocs, LOG_FILE)
shop_mode = get_shop_mode(shop)
shop_geo = get_shop_geo_region(shop)
region_info = get_geo_region_info(shop_geo)
blocks, ips, activation_time = get_shop_log_stats(shop)
if activation_time:
runtime = (datetime.now() - activation_time).total_seconds() / 60
req_min = blocks / runtime if runtime > 0 else 0
else:
runtime, req_min = 0, 0
print(f"\n{'' * 70}")
print(f"📊 {shop} | {region_info['icon']} {region_info['name']}")
print(f"{'' * 70}")
print(f"⏱️ Laufzeit: {format_duration(runtime)}")
print(f"📈 Blocks: {blocks} ({req_min:.1f} req/min)")
valid, count, err = validate_existing_cache(httpdocs, shop_geo)
print(f"✅ Cache: {count:,} Ranges" if valid else f"⚠️ Cache: {err}")
if os.path.isfile(log_file):
print(f"\n📝 Letzte 30 Blocks:")
with open(log_file, 'r') as f:
for line in f.readlines()[-30:]:
print(line.rstrip())
if ips:
print(f"\n🔥 Top 10 IPs:")
for ip, data in sorted(ips.items(), key=lambda x: x[1]['count'], reverse=True)[:10]:
bot = detect_bot(data['ua'])
print(f" {ip} ({bot}): {data['count']}x")
def show_all_logs():
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine aktiven Shops")
return
print(f"\n{'' * 70}")
print(" 📊 GESAMTÜBERSICHT")
print(f"{'' * 70}")
total_blocks = 0
all_ips = {}
for shop in active_shops:
blocks, ips, activation_time = get_shop_log_stats(shop)
total_blocks += blocks
region_info = get_geo_region_info(get_shop_geo_region(shop))
runtime = (datetime.now() - activation_time).total_seconds() / 60 if activation_time else 0
req_min = blocks / runtime if runtime > 0 else 0
link11_info = check_link11(shop)
color = COLOR_GREEN if link11_info['is_link11'] else COLOR_RED
print(f" {color}{shop}{COLOR_RESET} {region_info['icon']}: {blocks} ({req_min:.1f} req/min)")
for ip, data in ips.items():
if ip not in all_ips:
all_ips[ip] = {'count': 0, 'ua': data['ua']}
all_ips[ip]['count'] += data['count']
print(f"\n📊 Gesamt: {total_blocks} Blocks")
if all_ips:
print(f"\n🔥 Top 20 IPs:")
for ip, data in sorted(all_ips.items(), key=lambda x: x[1]['count'], reverse=True)[:20]:
print(f" {ip} ({detect_bot(data['ua'])}): {data['count']}x")
input("\nEnter zum Fortfahren...")
def main():
print("\n" + "=" * 60)
print(" GeoIP Shop Blocker Manager v3.1.0")
print(" 🇩🇪🇦🇹🇨🇭 DACH | 🇪🇺 Eurozone+GB (22 Länder)")
print(" 🛡️ Mit Cache-Validierung und Fail-Open")
print("=" * 60)
print(f" {'' if check_crowdsec() else '⚠️ '} CrowdSec")
code, stdout, _ = run_command("systemctl is-active geoip-crowdsec-watcher.service")
print(f" {'' if code == 0 and stdout.strip() == 'active' else '⚠️ '} Watcher-Service")
while True:
print("\n" + "-" * 40)
print("[1] Aktivieren (einzeln)")
print("[2] Deaktivieren (einzeln)")
print("[3] Logs anzeigen")
print("[4] Status")
print("-" * 40)
print("[5] 🚀 ALLE aktivieren")
print("[6] 🛑 ALLE deaktivieren")
print("-" * 40)
print("[0] Beenden")
choice = input("\nWähle: ").strip()
if choice == "1":
available = [s for s in get_available_shops() if s not in get_active_shops()]
if not available:
print("\n⚠️ Keine Shops verfügbar")
continue
print("\n📋 Verfügbare Shops:")
for i, shop in enumerate(available, 1):
print(format_shop_with_link11(shop, show_index=i))
try:
idx = int(input("\nShop wählen: ").strip()) - 1
if 0 <= idx < len(available):
geo = select_geo_region()
mode = select_mode()
region_info = get_geo_region_info(geo)
if input(f"\n{region_info['icon']} aktivieren für '{available[idx]}'? (ja/nein): ").lower() in ['ja', 'j']:
activate_blocking(available[idx], mode=mode, geo_region=geo)
except:
print("❌ Ungültig")
elif choice == "2":
active = get_active_shops()
if not active:
print("\n⚠️ Keine aktiven Shops")
continue
print("\n📋 Aktive Shops:")
for i, shop in enumerate(active, 1):
region_info = get_geo_region_info(get_shop_geo_region(shop))
mode_icon = "🛡️" if get_shop_mode(shop) == "php+crowdsec" else "📝"
print(f" [{i}] {format_shop_with_link11(shop)} {region_info['icon']} {mode_icon}")
try:
idx = int(input("\nShop wählen: ").strip()) - 1
if 0 <= idx < len(active):
if input(f"\nDeaktivieren für '{active[idx]}'? (ja/nein): ").lower() in ['ja', 'j']:
deactivate_blocking(active[idx])
except:
print("❌ Ungültig")
elif choice == "3":
active = get_active_shops()
if not active:
print("\n⚠️ Keine aktiven Shops")
continue
print("\n📋 Logs für:")
print(" [0] 📊 ALLE")
for i, shop in enumerate(active, 1):
region_info = get_geo_region_info(get_shop_geo_region(shop))
print(f" [{i}] {format_shop_with_link11(shop)} {region_info['icon']}")
try:
idx = int(input("\nWähle: ").strip())
if idx == 0:
show_all_logs()
elif 1 <= idx <= len(active):
show_logs(active[idx - 1])
except:
print("❌ Ungültig")
elif choice == "4":
shops = get_available_shops()
active = get_active_shops()
print(f"\n📊 {len(active)}/{len(shops)} Shops aktiv")
for shop in active:
region_info = get_geo_region_info(get_shop_geo_region(shop))
mode_icon = "🛡️" if get_shop_mode(shop) == "php+crowdsec" else "📝"
blocks, _, activation_time = get_shop_log_stats(shop)
runtime = (datetime.now() - activation_time).total_seconds() / 60 if activation_time else 0
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
valid, count, _ = validate_existing_cache(httpdocs, get_shop_geo_region(shop))
cache_str = f"{count:,}" if valid else "⚠️"
print(f" {format_shop_with_link11(shop)} {region_info['icon']} {mode_icon}")
print(f" {blocks} blocks, {format_duration(runtime)}, Cache: {cache_str}")
elif choice == "5":
activate_all_shops()
elif choice == "6":
deactivate_all_shops()
elif choice == "0":
print("\n👋 Auf Wiedersehen!")
break
if __name__ == "__main__":
if os.geteuid() != 0:
print("❌ Als root ausführen!")
sys.exit(1)
try:
main()
except KeyboardInterrupt:
print("\n\n👋 Abgebrochen")
sys.exit(0)