Files
geoip_shop_manager/geoip_shop_manager.py

1931 lines
66 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
GeoIP Shop Blocker Manager - DACH & Eurozone Version
2-Component System: PHP blocking + Python watcher (systemd service)
Supports two geo regions:
- DACH: Germany, Austria, Switzerland (3 countries)
- Eurozone+GB: All Eurozone countries + GB + CH (22 countries)
"""
import os
import sys
import shutil
import subprocess
import json
import time
import re
import socket
from datetime import datetime, timedelta
from pathlib import Path
# ANSI Color Codes
COLOR_GREEN = "\033[92m"
COLOR_RED = "\033[91m"
COLOR_YELLOW = "\033[93m"
COLOR_BLUE = "\033[94m"
COLOR_RESET = "\033[0m"
COLOR_BOLD = "\033[1m"
# Link11 IP
LINK11_IP = "128.65.223.106"
# Cache for DNS lookups (to avoid repeated lookups)
DNS_CACHE = {}
# Configuration
VHOSTS_DIR = "/var/www/vhosts"
BACKUP_SUFFIX = ".geoip_backup"
BLOCKING_FILE = "geoip_blocking.php"
CACHE_FILE = "geoip_ip_ranges.cache"
LOG_FILE = "geoip_blocked.log"
CROWDSEC_QUEUE_FILE = "geoip_crowdsec_queue.log"
WATCHER_SCRIPT = "/usr/local/bin/geoip_crowdsec_watcher.py"
SYSTEMD_SERVICE = "/etc/systemd/system/geoip-crowdsec-watcher.service"
ACTIVE_SHOPS_FILE = "/var/lib/crowdsec/geoip_active_shops.json"
# =============================================================================
# GEO REGIONS
# =============================================================================
GEO_REGIONS = {
"dach": {
"name": "DACH",
"countries": ["de", "at", "ch"],
"description": "Deutschland, Österreich, Schweiz",
"icon": "🇩🇪🇦🇹🇨🇭",
"short": "DACH"
},
"eurozone": {
"name": "Eurozone + GB",
"countries": [
"de", # Deutschland
"at", # Österreich
"ch", # Schweiz
"be", # Belgien
"cy", # Zypern
"ee", # Estland
"es", # Spanien
"fi", # Finnland
"fr", # Frankreich
"gb", # Großbritannien
"gr", # Griechenland
"hr", # Kroatien
"ie", # Irland
"it", # Italien
"lt", # Litauen
"lu", # Luxemburg
"lv", # Lettland
"mt", # Malta
"nl", # Niederlande
"pt", # Portugal
"si", # Slowenien
"sk", # Slowakei
],
"description": "22 Länder: DE, AT, CH, BE, CY, EE, ES, FI, FR, GB, GR, HR, IE, IT, LT, LU, LV, MT, NL, PT, SI, SK",
"icon": "🇪🇺",
"short": "EU+"
}
}
# =============================================================================
# BOT DETECTION - Comprehensive list of known bots/crawlers
# =============================================================================
BOT_PATTERNS = {
# OpenAI
'GPTBot': r'GPTBot',
'OAI-SearchBot': r'OAI-SearchBot',
'ChatGPT-User': r'ChatGPT-User',
# Anthropic (Claude)
'ClaudeBot': r'ClaudeBot',
'Claude-User': r'Claude-User',
'Claude-SearchBot': r'Claude-SearchBot',
'anthropic-ai': r'anthropic-ai',
'claude-web': r'claude-web',
# Google
'Googlebot': r'Googlebot',
'Google-Extended': r'Google-Extended',
'Googlebot-Image': r'Googlebot-Image',
'Googlebot-Video': r'Googlebot-Video',
'Googlebot-News': r'Googlebot-News',
'Gemini-Deep-Research': r'Gemini-Deep-Research',
'Google-CloudVertexBot': r'Google-CloudVertexBot',
'AdsBot-Google': r'AdsBot-Google',
'Mediapartners-Google': r'Mediapartners-Google',
'FeedFetcher-Google': r'FeedFetcher-Google',
'Google-InspectionTool': r'Google-InspectionTool',
# Microsoft/Bing
'Bingbot': r'[Bb]ingbot',
'BingPreview': r'BingPreview',
'msnbot': r'msnbot',
'AdIdxBot': r'AdIdxBot',
# Perplexity
'PerplexityBot': r'PerplexityBot',
'Perplexity-User': r'Perplexity-User',
# Apple
'Applebot': r'Applebot',
'Applebot-Extended': r'Applebot-Extended',
# Amazon
'Amazonbot': r'Amazonbot',
# Meta/Facebook
'FacebookBot': r'facebookexternalhit|FacebookBot',
'meta-externalagent': r'meta-externalagent',
'Meta-WebIndexer': r'Meta-WebIndexer',
# ByteDance/TikTok
'Bytespider': r'Bytespider',
# DuckDuckGo
'DuckDuckBot': r'DuckDuckBot',
'DuckAssistBot': r'DuckAssistBot',
# Other AI/LLM
'cohere-ai': r'cohere-ai',
'YouBot': r'YouBot',
'MistralAI-User': r'MistralAI-User',
'AI2Bot': r'AI2Bot',
'CCBot': r'CCBot',
'Diffbot': r'Diffbot',
'Timpibot': r'Timpibot',
'omgili': r'omgili',
'webzio': r'webzio',
'ICC-Crawler': r'ICC-Crawler',
# SEO Tools
'AhrefsBot': r'AhrefsBot',
'SemrushBot': r'SemrushBot',
'MJ12bot': r'MJ12bot',
'DotBot': r'DotBot',
'BLEXBot': r'BLEXBot',
'DataForSeoBot': r'DataForSeoBot',
'SEOkicks': r'SEOkicks',
'seoscanners': r'seoscanners',
'Screaming Frog': r'Screaming Frog',
'Sistrix': r'Sistrix',
'JEEC2Bot': r'JEEC2Bot',
# Other Search Engines
'YandexBot': r'YandexBot',
'YandexImages': r'YandexImages',
'Baiduspider': r'Baiduspider',
'PetalBot': r'PetalBot',
'Sogou': r'Sogou',
'Qwantify': r'Qwantify',
'ia_archiver': r'ia_archiver',
# Social Media
'LinkedInBot': r'LinkedInBot',
'Twitterbot': r'Twitterbot',
'Pinterest': r'Pinterest',
'Slackbot': r'Slackbot',
'TelegramBot': r'TelegramBot',
'WhatsApp': r'WhatsApp',
'Discordbot': r'Discordbot',
# Monitoring & Security
'UptimeRobot': r'UptimeRobot',
'Pingdom': r'Pingdom',
'StatusCake': r'StatusCake',
'GTmetrix': r'GTmetrix',
'Site24x7': r'Site24x7',
# Payment/E-Commerce
'PayPal IPN': r'PayPal',
'Stripe': r'Stripe',
'Shopify': r'Shopify',
# Feed Readers
'Feedly': r'Feedly',
'NewsBlur': r'NewsBlur',
# Other known bots
'SeznamBot': r'SeznamBot',
'Exabot': r'Exabot',
'archive.org_bot': r'archive\.org_bot',
'Wget': r'Wget',
'curl': r'^curl/',
'python-requests': r'python-requests',
'Go-http-client': r'Go-http-client',
'Java': r'^Java/',
'Apache-HttpClient': r'Apache-HttpClient',
'okhttp': r'okhttp',
'HeadlessChrome': r'HeadlessChrome',
'PhantomJS': r'PhantomJS',
'Scrapy': r'Scrapy',
}
def detect_bot(user_agent):
"""Detect bot name from user agent string"""
if not user_agent or user_agent == 'Unknown':
return 'Unbekannt'
for bot_name, pattern in BOT_PATTERNS.items():
if re.search(pattern, user_agent, re.IGNORECASE):
return bot_name
return 'Unbekannt'
def check_link11(domain):
"""Check if domain resolves to Link11 IP"""
global DNS_CACHE
# Check cache first
if domain in DNS_CACHE:
return DNS_CACHE[domain]
try:
ip = socket.gethostbyname(domain)
is_link11 = (ip == LINK11_IP)
DNS_CACHE[domain] = {'is_link11': is_link11, 'ip': ip}
return DNS_CACHE[domain]
except socket.gaierror:
DNS_CACHE[domain] = {'is_link11': False, 'ip': 'N/A'}
return DNS_CACHE[domain]
def format_shop_with_link11(shop, prefix="", show_index=None):
"""Format shop name with Link11 color coding"""
link11_info = check_link11(shop)
if link11_info['is_link11']:
color = COLOR_GREEN
suffix = " [Link11]"
else:
color = COLOR_RED
suffix = " [Direkt]"
if show_index is not None:
return f" [{show_index}] {color}{shop}{suffix}{COLOR_RESET}"
else:
return f"{prefix}{color}{shop}{suffix}{COLOR_RESET}"
def get_geo_region_info(geo_region):
"""Get info for a geo region"""
return GEO_REGIONS.get(geo_region, GEO_REGIONS["dach"])
def generate_php_countries_array(geo_region):
"""Generate PHP array string for countries"""
region_info = get_geo_region_info(geo_region)
countries = region_info["countries"]
return ", ".join([f"'{c}'" for c in countries])
# PHP GeoIP blocking script template (with CrowdSec)
GEOIP_SCRIPT_TEMPLATE = '''<?php
/**
* GeoIP Blocking Script - Blocks all IPs outside allowed region
* Region: {region_name} ({region_description})
* Logs blocked IPs for CrowdSec watcher to process
* Valid until: {expiry_date}
*/
// Auto-disable after 72 hours
$expiry_date = strtotime('{expiry_timestamp}');
if (time() > $expiry_date) {{
return; // Script expired, allow all traffic
}}
// Get visitor IP
$visitor_ip = $_SERVER['REMOTE_ADDR'] ?? '';
if (empty($visitor_ip)) {{
return;
}}
// Skip private IPs
if (filter_var($visitor_ip, FILTER_VALIDATE_IP, FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE) === false) {{
return;
}}
// Files
$cache_file = __DIR__ . '/{cache_file}';
$cache_duration = 86400; // 24 hours
$log_file = __DIR__ . '/{log_file}';
$crowdsec_queue = __DIR__ . '/{crowdsec_queue}';
// Allowed countries
$allowed_countries = [{countries_array}];
// Function to download IP ranges for allowed countries
function download_allowed_ranges($countries) {{
$ranges = [];
foreach ($countries as $country) {{
$url = "https://www.ipdeny.com/ipblocks/data/aggregated/$country-aggregated.zone";
$content = @file_get_contents($url);
if ($content !== false) {{
$lines = explode("\\n", trim($content));
foreach ($lines as $line) {{
$line = trim($line);
if (!empty($line) && strpos($line, '/') !== false) {{
$ranges[] = $line;
}}
}}
}}
}}
return $ranges;
}}
// Function to check if IP is in CIDR range
function ip_in_range($ip, $cidr) {{
list($subnet, $mask) = explode('/', $cidr);
$ip_long = ip2long($ip);
$subnet_long = ip2long($subnet);
$mask_long = -1 << (32 - (int)$mask);
return ($ip_long & $mask_long) == ($subnet_long & $mask_long);
}}
// Load or download IP ranges
$allowed_ranges = [];
if (file_exists($cache_file) && (time() - filemtime($cache_file)) < $cache_duration) {{
$allowed_ranges = unserialize(file_get_contents($cache_file));
}} else {{
$allowed_ranges = download_allowed_ranges($allowed_countries);
if (!empty($allowed_ranges)) {{
@file_put_contents($cache_file, serialize($allowed_ranges));
}}
}}
// Check if visitor IP is from allowed region
$is_allowed = false;
foreach ($allowed_ranges as $range) {{
if (ip_in_range($visitor_ip, $range)) {{
$is_allowed = true;
break;
}}
}}
// Block non-allowed IPs
if (!$is_allowed) {{
$timestamp = date('Y-m-d H:i:s');
$user_agent = $_SERVER['HTTP_USER_AGENT'] ?? 'Unknown';
$request_uri = $_SERVER['REQUEST_URI'] ?? '/';
// Log for humans
$log_entry = "[$timestamp] IP: $visitor_ip | UA: $user_agent | URI: $request_uri\\n";
@file_put_contents($log_file, $log_entry, FILE_APPEND | LOCK_EX);
// Queue for CrowdSec (simple format)
$queue_entry = "$timestamp|$visitor_ip|{shop_name}\\n";
@file_put_contents($crowdsec_queue, $queue_entry, FILE_APPEND | LOCK_EX);
header('HTTP/1.1 403 Forbidden');
exit;
}}
'''
# PHP GeoIP blocking script template - PHP ONLY (no CrowdSec queue)
GEOIP_SCRIPT_TEMPLATE_PHP_ONLY = '''<?php
/**
* GeoIP Blocking Script - Blocks all IPs outside allowed region (PHP-only mode)
* Region: {region_name} ({region_description})
* Logs blocked IPs (no CrowdSec synchronisation)
* Valid until: {expiry_date}
*/
// Auto-disable after 72 hours
$expiry_date = strtotime('{expiry_timestamp}');
if (time() > $expiry_date) {{
return; // Script expired, allow all traffic
}}
// Get visitor IP
$visitor_ip = $_SERVER['REMOTE_ADDR'] ?? '';
if (empty($visitor_ip)) {{
return;
}}
// Skip private IPs
if (filter_var($visitor_ip, FILTER_VALIDATE_IP, FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE) === false) {{
return;
}}
// Files
$cache_file = __DIR__ . '/{cache_file}';
$cache_duration = 86400; // 24 hours
$log_file = __DIR__ . '/{log_file}';
// Allowed countries
$allowed_countries = [{countries_array}];
// Function to download IP ranges for allowed countries
function download_allowed_ranges($countries) {{
$ranges = [];
foreach ($countries as $country) {{
$url = "https://www.ipdeny.com/ipblocks/data/aggregated/$country-aggregated.zone";
$content = @file_get_contents($url);
if ($content !== false) {{
$lines = explode("\\n", trim($content));
foreach ($lines as $line) {{
$line = trim($line);
if (!empty($line) && strpos($line, '/') !== false) {{
$ranges[] = $line;
}}
}}
}}
}}
return $ranges;
}}
// Function to check if IP is in CIDR range
function ip_in_range($ip, $cidr) {{
list($subnet, $mask) = explode('/', $cidr);
$ip_long = ip2long($ip);
$subnet_long = ip2long($subnet);
$mask_long = -1 << (32 - (int)$mask);
return ($ip_long & $mask_long) == ($subnet_long & $mask_long);
}}
// Load or download IP ranges
$allowed_ranges = [];
if (file_exists($cache_file) && (time() - filemtime($cache_file)) < $cache_duration) {{
$allowed_ranges = unserialize(file_get_contents($cache_file));
}} else {{
$allowed_ranges = download_allowed_ranges($allowed_countries);
if (!empty($allowed_ranges)) {{
@file_put_contents($cache_file, serialize($allowed_ranges));
}}
}}
// Check if visitor IP is from allowed region
$is_allowed = false;
foreach ($allowed_ranges as $range) {{
if (ip_in_range($visitor_ip, $range)) {{
$is_allowed = true;
break;
}}
}}
// Block non-allowed IPs
if (!$is_allowed) {{
$timestamp = date('Y-m-d H:i:s');
$user_agent = $_SERVER['HTTP_USER_AGENT'] ?? 'Unknown';
$request_uri = $_SERVER['REQUEST_URI'] ?? '/';
// Log for humans
$log_entry = "[$timestamp] IP: $visitor_ip | UA: $user_agent | URI: $request_uri\\n";
@file_put_contents($log_file, $log_entry, FILE_APPEND | LOCK_EX);
header('HTTP/1.1 403 Forbidden');
exit;
}}
'''
# Python watcher script (runs as systemd service)
WATCHER_SCRIPT_CONTENT = '''#!/usr/bin/env python3
"""
GeoIP CrowdSec Watcher Service
Monitors queue files and adds blocked IPs to CrowdSec
"""
import os
import sys
import time
import subprocess
import json
from pathlib import Path
from datetime import datetime
VHOSTS_DIR = "/var/www/vhosts"
QUEUE_FILE = "geoip_crowdsec_queue.log"
ACTIVE_SHOPS_FILE = "/var/lib/crowdsec/geoip_active_shops.json"
PROCESSED_IPS = {} # In-memory cache to avoid re-adding same IP
CHECK_INTERVAL = 5 # Check every 5 seconds
def log(msg):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}", flush=True)
def get_active_shops():
"""Get list of shops with active GeoIP blocking"""
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return {}
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
return json.load(f)
except:
return {}
def add_to_crowdsec(ip, shop):
"""Add IP to CrowdSec with 72h ban"""
# Check if already processed recently (within last hour)
now = time.time()
if ip in PROCESSED_IPS and (now - PROCESSED_IPS[ip]) < 3600:
return True
cmd = [
'cscli', 'decisions', 'add',
'--ip', ip,
'--duration', '72h',
'--type', 'ban',
'--reason', f'GeoIP: Non-allowed IP blocked by {shop}'
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
PROCESSED_IPS[ip] = now
log(f"✅ Added {ip} to CrowdSec (from {shop})")
return True
else:
log(f"⚠️ Failed to add {ip}: {result.stderr.strip()}")
return False
except Exception as e:
log(f"❌ Error adding {ip}: {e}")
return False
def process_queue_file(shop_path, shop):
"""Process queue file for a shop"""
queue_file = os.path.join(shop_path, 'httpdocs', QUEUE_FILE)
if not os.path.isfile(queue_file):
return 0
processed = 0
try:
# Read all lines
with open(queue_file, 'r') as f:
lines = f.readlines()
if not lines:
return 0
# Process each line
for line in lines:
line = line.strip()
if not line:
continue
try:
parts = line.split('|')
if len(parts) >= 2:
timestamp = parts[0]
ip = parts[1]
if add_to_crowdsec(ip, shop):
processed += 1
except:
continue
# Clear the file after processing
if processed > 0:
with open(queue_file, 'w') as f:
f.write('')
except Exception as e:
log(f"❌ Error processing {shop}: {e}")
return processed
def main():
log("🚀 GeoIP CrowdSec Watcher started")
while True:
try:
active_shops = get_active_shops()
if not active_shops:
time.sleep(CHECK_INTERVAL)
continue
total_processed = 0
for shop, info in active_shops.items():
shop_path = os.path.join(VHOSTS_DIR, shop)
if os.path.isdir(shop_path):
count = process_queue_file(shop_path, shop)
total_processed += count
if total_processed > 0:
log(f"📊 Processed {total_processed} IPs in this cycle")
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
log("👋 Shutting down...")
break
except Exception as e:
log(f"❌ Error in main loop: {e}")
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main()
'''
# Systemd service file
SYSTEMD_SERVICE_CONTENT = '''[Unit]
Description=GeoIP CrowdSec Watcher Service
After=network.target crowdsec.service
Wants=crowdsec.service
[Service]
Type=simple
ExecStart=/usr/bin/python3 /usr/local/bin/geoip_crowdsec_watcher.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
'''
def run_command(cmd, capture_output=True):
"""Run a shell command"""
try:
if capture_output:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
return result.returncode, result.stdout, result.stderr
else:
result = subprocess.run(cmd, shell=True, timeout=30)
return result.returncode, "", ""
except Exception as e:
return -1, "", str(e)
def check_crowdsec():
"""Check if CrowdSec is running"""
code, stdout, stderr = run_command("systemctl is-active crowdsec")
return code == 0 and stdout.strip() == "active"
def install_watcher_service():
"""Install the watcher script and systemd service"""
print(" 📦 Installiere CrowdSec-Watcher-Service...")
# Create watcher script
with open(WATCHER_SCRIPT, 'w') as f:
f.write(WATCHER_SCRIPT_CONTENT)
os.chmod(WATCHER_SCRIPT, 0o755)
print(f" ✅ Watcher-Script erstellt: {WATCHER_SCRIPT}")
# Create systemd service
with open(SYSTEMD_SERVICE, 'w') as f:
f.write(SYSTEMD_SERVICE_CONTENT)
print(f" ✅ Systemd-Service erstellt: {SYSTEMD_SERVICE}")
# Reload systemd and start service
run_command("systemctl daemon-reload")
run_command("systemctl enable geoip-crowdsec-watcher.service")
run_command("systemctl start geoip-crowdsec-watcher.service")
# Check if started
time.sleep(1)
code, stdout, _ = run_command("systemctl is-active geoip-crowdsec-watcher.service")
if code == 0 and stdout.strip() == "active":
print(" ✅ Service gestartet und läuft")
return True
else:
print(" ⚠️ Service konnte nicht gestartet werden")
return False
def uninstall_watcher_service():
"""Uninstall the watcher script and systemd service"""
print(" 📦 Deinstalliere CrowdSec-Watcher-Service...")
# Stop and disable service
run_command("systemctl stop geoip-crowdsec-watcher.service")
run_command("systemctl disable geoip-crowdsec-watcher.service")
# Remove files
if os.path.isfile(SYSTEMD_SERVICE):
os.remove(SYSTEMD_SERVICE)
print(f" 🗑️ Service-Datei gelöscht")
if os.path.isfile(WATCHER_SCRIPT):
os.remove(WATCHER_SCRIPT)
print(f" 🗑️ Watcher-Script gelöscht")
run_command("systemctl daemon-reload")
print(" ✅ Service deinstalliert")
def add_shop_to_active(shop, mode="php+crowdsec", geo_region="dach"):
"""Add shop to active shops tracking with mode and geo region"""
os.makedirs(os.path.dirname(ACTIVE_SHOPS_FILE), exist_ok=True)
shops = {}
if os.path.isfile(ACTIVE_SHOPS_FILE):
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
shops[shop] = {
"activated": datetime.now().isoformat(),
"expiry": (datetime.now() + timedelta(hours=72)).isoformat(),
"mode": mode, # "php+crowdsec" or "php-only"
"geo_region": geo_region # "dach" or "eurozone"
}
with open(ACTIVE_SHOPS_FILE, 'w') as f:
json.dump(shops, f, indent=2)
def get_shop_mode(shop):
"""Get the blocking mode for a shop"""
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return "php+crowdsec"
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
return shops.get(shop, {}).get("mode", "php+crowdsec")
except:
return "php+crowdsec"
def get_shop_geo_region(shop):
"""Get the geo region for a shop"""
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return "dach"
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
return shops.get(shop, {}).get("geo_region", "dach")
except:
return "dach"
def get_shop_activation_time(shop):
"""Get the activation timestamp for a shop"""
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return None
try:
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
activated_str = shops.get(shop, {}).get("activated")
if activated_str:
return datetime.fromisoformat(activated_str)
except:
pass
return None
def format_duration(minutes):
"""Format minutes as human readable duration"""
if minutes < 60:
return f"{int(minutes)}m"
hours = minutes / 60
if hours < 24:
return f"{int(hours)}h {int(minutes % 60)}m"
days = hours / 24
remaining_hours = hours % 24
return f"{int(days)}d {int(remaining_hours)}h"
def remove_shop_from_active(shop):
"""Remove shop from active shops tracking"""
if not os.path.isfile(ACTIVE_SHOPS_FILE):
return
with open(ACTIVE_SHOPS_FILE, 'r') as f:
shops = json.load(f)
if shop in shops:
del shops[shop]
with open(ACTIVE_SHOPS_FILE, 'w') as f:
json.dump(shops, f, indent=2)
def cleanup_crowdsec_decisions(shop):
"""Remove CrowdSec decisions for a shop"""
if not check_crowdsec():
return
print(f" 🔍 Entferne CrowdSec-Decisions für {shop}...")
total_removed = 0
max_iterations = 50 # Safety limit
iteration = 0
while iteration < max_iterations:
iteration += 1
# Get all decisions with --limit 0 (no pagination)
list_cmd = f"cscli decisions list -o raw --limit 0 | grep '{shop}'"
code, stdout, stderr = run_command(list_cmd)
if code != 0 or not stdout.strip():
break # No more decisions found
# Extract IPs and delete them (process in batches of 100)
lines = stdout.strip().split('\n')
batch_count = 0
for line in lines[:100]: # Process max 100 per iteration
try:
parts = line.split(',')
if len(parts) >= 3:
ip_field = parts[2].strip()
if ':' in ip_field:
ip = ip_field.split(':', 1)[1]
else:
ip = ip_field
if ip:
del_cmd = f"cscli decisions delete --ip {ip}"
del_code, _, _ = run_command(del_cmd)
if del_code == 0:
batch_count += 1
except:
continue
total_removed += batch_count
if batch_count == 0:
break # Nothing deleted in this iteration
# Small progress indicator for large cleanups
if iteration > 1:
print(f" ... {total_removed} Decisions entfernt (Durchlauf {iteration})")
if total_removed > 0:
print(f"{total_removed} Decisions entfernt")
else:
print(" Keine Decisions gefunden")
def get_available_shops():
"""Get list of all shops"""
shops = []
if not os.path.exists(VHOSTS_DIR):
return shops
for entry in os.listdir(VHOSTS_DIR):
shop_path = os.path.join(VHOSTS_DIR, entry)
if os.path.isdir(shop_path) and entry not in ['chroot', 'system', 'default']:
httpdocs = os.path.join(shop_path, 'httpdocs')
if os.path.isdir(httpdocs):
index_php = os.path.join(httpdocs, 'index.php')
if os.path.isfile(index_php):
shops.append(entry)
return sorted(shops)
def get_active_shops():
"""Get list of shops with active blocking"""
active = []
shops = get_available_shops()
for shop in shops:
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
backup_file = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
if os.path.isfile(blocking_file) or os.path.isfile(backup_file):
active.append(shop)
return active
def select_geo_region():
"""Interactive geo region selection"""
print(f"\n🌍 Wähle die Geo-Region:")
print(f" [1] {GEO_REGIONS['dach']['icon']} DACH - {GEO_REGIONS['dach']['description']}")
print(f" [2] {GEO_REGIONS['eurozone']['icon']} Eurozone+GB - {len(GEO_REGIONS['eurozone']['countries'])} Länder")
choice = input(f"\nRegion wählen [1/2]: ").strip()
if choice == "2":
return "eurozone"
else:
return "dach"
def select_mode():
"""Interactive mode selection"""
print(f"\n🔧 Wähle den Blocking-Modus:")
print(f" [1] PHP + CrowdSec (IPs werden an CrowdSec gemeldet)")
print(f" [2] Nur PHP (keine CrowdSec-Synchronisation)")
choice = input(f"\nModus wählen [1/2]: ").strip()
if choice == "2":
return "php-only"
else:
return "php+crowdsec"
def activate_blocking(shop, silent=False, mode="php+crowdsec", geo_region="dach"):
"""Activate GeoIP blocking for a single shop"""
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
region_info = get_geo_region_info(geo_region)
if os.path.isfile(backup_php):
if not silent:
print(f"⚠️ GeoIP-Blocking bereits aktiv für {shop}")
return False
if not os.path.isfile(index_php):
if not silent:
print(f"❌ index.php nicht gefunden")
return False
if not silent:
print(f"\n🔧 Aktiviere {region_info['icon']} {region_info['name']} GeoIP-Blocking für: {shop}")
print(f" Erlaubt: {region_info['description']}")
print(f" Modus: {'PHP + CrowdSec' if mode == 'php+crowdsec' else 'Nur PHP'}")
print("=" * 60)
# Step 1: Install watcher service if not exists (only for php+crowdsec mode)
if mode == "php+crowdsec":
active_shops = get_active_shops()
# Check if any shop uses crowdsec mode
crowdsec_shops = [s for s in active_shops if get_shop_mode(s) == "php+crowdsec"]
if not crowdsec_shops: # First shop with crowdsec
if not silent:
print("\n[1/3] Installiere CrowdSec-Watcher-Service...")
if check_crowdsec():
install_watcher_service()
else:
if not silent:
print(" ⚠️ CrowdSec nicht verfügbar - nur PHP-Blocking")
else:
if not silent:
print("\n[1/3] CrowdSec-Watcher-Service bereits aktiv")
else:
if not silent:
print("\n[1/3] CrowdSec-Synchronisation deaktiviert (nur PHP-Modus)")
# Step 2: PHP blocking
if not silent:
print("\n[2/3] Aktiviere PHP-Blocking...")
print(" 📋 Backup erstellen...")
shutil.copy2(index_php, backup_php)
with open(index_php, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
insert_line = 0
for i, line in enumerate(lines):
if 'declare(strict_types' in line:
insert_line = i + 1
break
elif '<?php' in line and insert_line == 0:
insert_line = i + 1
require_statement = f"require_once __DIR__ . '/{BLOCKING_FILE}';"
if require_statement not in content:
lines.insert(insert_line, require_statement)
with open(index_php, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
if not silent:
print(" ✏️ index.php modifiziert")
expiry = datetime.now() + timedelta(hours=72)
countries_array = generate_php_countries_array(geo_region)
# Generate PHP script based on mode
if mode == "php+crowdsec":
geoip_content = GEOIP_SCRIPT_TEMPLATE.format(
region_name=region_info['name'],
region_description=region_info['description'],
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop,
countries_array=countries_array
)
else:
geoip_content = GEOIP_SCRIPT_TEMPLATE_PHP_ONLY.format(
region_name=region_info['name'],
region_description=region_info['description'],
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
shop_name=shop,
countries_array=countries_array
)
with open(blocking_file, 'w', encoding='utf-8') as f:
f.write(geoip_content)
if not silent:
print(" 📝 geoip_blocking.php erstellt")
# Step 3: Register shop
if not silent:
print("\n[3/3] Registriere Shop...")
add_shop_to_active(shop, mode, geo_region)
if not silent:
print(" ✅ Shop registriert")
if not silent:
print("\n" + "=" * 60)
print(f"{region_info['icon']} {region_info['name']} GeoIP-Blocking aktiviert für: {shop}")
print(f" Erlaubte Länder: {region_info['description']}")
print(f" Modus: {'PHP + CrowdSec 🛡️' if mode == 'php+crowdsec' else 'Nur PHP 📝'}")
print(f" Gültig bis: {expiry.strftime('%Y-%m-%d %H:%M:%S CET')}")
print(f" PHP-Log: {os.path.join(httpdocs, LOG_FILE)}")
if mode == "php+crowdsec":
print(f" CrowdSec-Queue: {os.path.join(httpdocs, CROWDSEC_QUEUE_FILE)}")
print(f"\n 🔄 Der Watcher-Service synchronisiert blockierte IPs zu CrowdSec")
print("=" * 60)
return True
def deactivate_blocking(shop, silent=False):
"""Deactivate GeoIP blocking for a single shop"""
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
cache_file = os.path.join(httpdocs, CACHE_FILE)
log_file = os.path.join(httpdocs, LOG_FILE)
queue_file = os.path.join(httpdocs, CROWDSEC_QUEUE_FILE)
# Get mode and geo region before removing from tracking
shop_mode = get_shop_mode(shop)
shop_geo = get_shop_geo_region(shop)
region_info = get_geo_region_info(shop_geo)
if not silent:
print(f"\n🔧 Deaktiviere {region_info['icon']} {region_info['name']} GeoIP-Blocking für: {shop}")
print(f" Modus war: {'PHP + CrowdSec' if shop_mode == 'php+crowdsec' else 'Nur PHP'}")
print("=" * 60)
# Step 1: Remove PHP blocking
if not silent:
print("\n[1/4] PHP-Blocking entfernen...")
if os.path.isfile(backup_php):
shutil.move(backup_php, index_php)
if not silent:
print(" 📋 index.php wiederhergestellt")
else:
if os.path.isfile(index_php):
with open(index_php, 'r') as f:
content = f.read()
lines = [line for line in content.split('\n') if BLOCKING_FILE not in line]
with open(index_php, 'w') as f:
f.write('\n'.join(lines))
for f in [blocking_file, cache_file, log_file, queue_file]:
if os.path.isfile(f):
os.remove(f)
if not silent:
print(" 🗑️ PHP-Dateien gelöscht")
# Step 2: Remove from tracking
if not silent:
print("\n[2/4] Deregistriere Shop...")
remove_shop_from_active(shop)
if not silent:
print(" ✅ Shop deregistriert")
# Step 3: Clean CrowdSec decisions (only if mode was php+crowdsec)
if not silent:
print("\n[3/4] CrowdSec-Decisions entfernen...")
if shop_mode == "php+crowdsec" and check_crowdsec():
cleanup_crowdsec_decisions(shop)
else:
if not silent:
print(" Keine CrowdSec-Synchronisation aktiv (PHP-only Modus)")
# Step 4: Uninstall service if no more crowdsec shops
if not silent:
print("\n[4/4] Prüfe Watcher-Service...")
remaining_shops = get_active_shops()
crowdsec_shops = [s for s in remaining_shops if get_shop_mode(s) == "php+crowdsec"]
if not crowdsec_shops:
if not silent:
print(" Keine Shops mit CrowdSec-Modus mehr - deinstalliere Service")
uninstall_watcher_service()
else:
if not silent:
print(f" Service bleibt aktiv ({len(crowdsec_shops)} Shop(s) mit CrowdSec-Modus)")
if not silent:
print("\n" + "=" * 60)
print(f"✅ GeoIP-Blocking deaktiviert für: {shop}")
print("=" * 60)
return True
def activate_all_shops():
"""Activate GeoIP blocking for all available shops"""
shops = get_available_shops()
active_shops = get_active_shops()
available_shops = [s for s in shops if s not in active_shops]
if not available_shops:
print("\n⚠️ Keine Shops zum Aktivieren verfügbar")
print(" Alle Shops haben bereits aktives GeoIP-Blocking")
return
print(f"\n{'=' * 60}")
print(f" GeoIP-Blocking für ALLE Shops aktivieren")
print(f"{'=' * 60}")
print(f"\n📋 Folgende {len(available_shops)} Shop(s) werden aktiviert:")
print(f" {COLOR_GREEN}Grün = hinter Link11{COLOR_RESET} | {COLOR_RED}Rot = Direkt{COLOR_RESET}\n")
for shop in available_shops:
link11_info = check_link11(shop)
color = COLOR_GREEN if link11_info['is_link11'] else COLOR_RED
link11_tag = "[Link11]" if link11_info['is_link11'] else "[Direkt]"
print(f"{color}{shop} {link11_tag}{COLOR_RESET}")
# Ask for geo region
geo_region = select_geo_region()
region_info = get_geo_region_info(geo_region)
# Ask for mode
mode = select_mode()
mode_display = "PHP + CrowdSec 🛡️" if mode == "php+crowdsec" else "Nur PHP 📝"
print(f"\n⚠️ Dies aktiviert den Schutz für alle oben genannten Shops!")
print(f" Region: {region_info['icon']} {region_info['name']}")
print(f" Modus: {mode_display}")
confirm = input(f"\nFortfahren? (ja/nein): ").strip().lower()
if confirm not in ['ja', 'j', 'yes', 'y']:
print("\n❌ Abgebrochen")
return
print(f"\n{'=' * 60}")
print(f" Starte Aktivierung ({region_info['icon']} {region_info['name']}, {mode_display})...")
print(f"{'=' * 60}")
success_count = 0
failed_count = 0
failed_shops = []
# Install watcher service first if needed (only for php+crowdsec mode)
if mode == "php+crowdsec":
crowdsec_shops = [s for s in active_shops if get_shop_mode(s) == "php+crowdsec"]
if not crowdsec_shops and check_crowdsec():
print("\n📦 Installiere CrowdSec-Watcher-Service...")
install_watcher_service()
for i, shop in enumerate(available_shops, 1):
print(f"\n[{i}/{len(available_shops)}] Aktiviere: {shop}")
try:
# Use silent mode but show progress
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
if os.path.isfile(backup_php):
print(f" ⚠️ Bereits aktiv - überspringe")
continue
if not os.path.isfile(index_php):
print(f" ❌ index.php nicht gefunden")
failed_count += 1
failed_shops.append(shop)
continue
# Create backup
shutil.copy2(index_php, backup_php)
# Modify index.php
with open(index_php, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
insert_line = 0
for idx, line in enumerate(lines):
if 'declare(strict_types' in line:
insert_line = idx + 1
break
elif '<?php' in line and insert_line == 0:
insert_line = idx + 1
require_statement = f"require_once __DIR__ . '/{BLOCKING_FILE}';"
if require_statement not in content:
lines.insert(insert_line, require_statement)
with open(index_php, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
# Create blocking script based on mode
expiry = datetime.now() + timedelta(hours=72)
countries_array = generate_php_countries_array(geo_region)
if mode == "php+crowdsec":
geoip_content = GEOIP_SCRIPT_TEMPLATE.format(
region_name=region_info['name'],
region_description=region_info['description'],
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
crowdsec_queue=CROWDSEC_QUEUE_FILE,
shop_name=shop,
countries_array=countries_array
)
else:
geoip_content = GEOIP_SCRIPT_TEMPLATE_PHP_ONLY.format(
region_name=region_info['name'],
region_description=region_info['description'],
expiry_date=expiry.strftime('%Y-%m-%d %H:%M:%S CET'),
expiry_timestamp=expiry.strftime('%Y-%m-%d %H:%M:%S'),
cache_file=CACHE_FILE,
log_file=LOG_FILE,
shop_name=shop,
countries_array=countries_array
)
with open(blocking_file, 'w', encoding='utf-8') as f:
f.write(geoip_content)
# Register shop with mode and geo region
add_shop_to_active(shop, mode, geo_region)
print(f" ✅ Aktiviert (bis {expiry.strftime('%Y-%m-%d %H:%M')})")
success_count += 1
except Exception as e:
print(f" ❌ Fehler: {e}")
failed_count += 1
failed_shops.append(shop)
# Summary
print(f"\n{'=' * 60}")
print(" ZUSAMMENFASSUNG")
print(f"{'=' * 60}")
print(f"\n ✅ Erfolgreich aktiviert: {success_count}")
print(f" ❌ Fehlgeschlagen: {failed_count}")
if failed_shops:
print(f"\n Fehlgeschlagene Shops:")
for shop in failed_shops:
print(f"{shop}")
print(f"\n {region_info['icon']} Region: {region_info['name']}")
print(f" 🔧 Modus: {mode_display}")
print(f" ⏰ Gültig für 72 Stunden")
print(f"{'=' * 60}")
def deactivate_all_shops():
"""Deactivate GeoIP blocking for all active shops"""
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine Shops mit aktivem GeoIP-Blocking gefunden")
return
print(f"\n{'=' * 60}")
print(f" GeoIP-Blocking für ALLE Shops deaktivieren")
print(f"{'=' * 60}")
print(f"\n📋 Folgende {len(active_shops)} Shop(s) werden deaktiviert:")
print(f" {COLOR_GREEN}Grün = hinter Link11{COLOR_RESET} | {COLOR_RED}Rot = Direkt{COLOR_RESET}\n")
for shop in active_shops:
link11_info = check_link11(shop)
color = COLOR_GREEN if link11_info['is_link11'] else COLOR_RED
link11_tag = "[Link11]" if link11_info['is_link11'] else "[Direkt]"
geo_region = get_shop_geo_region(shop)
region_info = get_geo_region_info(geo_region)
print(f"{color}{shop} {link11_tag}{COLOR_RESET} {region_info['icon']}")
print(f"\n⚠️ Dies deaktiviert den Schutz für alle oben genannten Shops!")
print(f"⚠️ Alle zugehörigen CrowdSec-Decisions werden ebenfalls entfernt!")
confirm = input(f"\nFortfahren? (ja/nein): ").strip().lower()
if confirm not in ['ja', 'j', 'yes', 'y']:
print("\n❌ Abgebrochen")
return
print(f"\n{'=' * 60}")
print(" Starte Deaktivierung...")
print(f"{'=' * 60}")
success_count = 0
failed_count = 0
failed_shops = []
for i, shop in enumerate(active_shops, 1):
print(f"\n[{i}/{len(active_shops)}] Deaktiviere: {shop}")
try:
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
index_php = os.path.join(httpdocs, 'index.php')
backup_php = os.path.join(httpdocs, f'index.php{BACKUP_SUFFIX}')
blocking_file = os.path.join(httpdocs, BLOCKING_FILE)
cache_file = os.path.join(httpdocs, CACHE_FILE)
log_file_path = os.path.join(httpdocs, LOG_FILE)
queue_file = os.path.join(httpdocs, CROWDSEC_QUEUE_FILE)
# Restore backup
if os.path.isfile(backup_php):
shutil.move(backup_php, index_php)
print(f" 📋 index.php wiederhergestellt")
else:
if os.path.isfile(index_php):
with open(index_php, 'r') as f:
content = f.read()
lines = [line for line in content.split('\n') if BLOCKING_FILE not in line]
with open(index_php, 'w') as f:
f.write('\n'.join(lines))
# Remove files
for f in [blocking_file, cache_file, log_file_path, queue_file]:
if os.path.isfile(f):
os.remove(f)
# Remove from tracking
remove_shop_from_active(shop)
# Clean CrowdSec decisions
if check_crowdsec():
cleanup_crowdsec_decisions(shop)
print(f" ✅ Deaktiviert")
success_count += 1
except Exception as e:
print(f" ❌ Fehler: {e}")
failed_count += 1
failed_shops.append(shop)
# Uninstall watcher service
print("\n📦 Deinstalliere CrowdSec-Watcher-Service...")
uninstall_watcher_service()
# Summary
print(f"\n{'=' * 60}")
print(" ZUSAMMENFASSUNG")
print(f"{'=' * 60}")
print(f"\n ✅ Erfolgreich deaktiviert: {success_count}")
print(f" ❌ Fehlgeschlagen: {failed_count}")
if failed_shops:
print(f"\n Fehlgeschlagene Shops:")
for shop in failed_shops:
print(f"{shop}")
print(f"\n 🌍 Alle IPs sind nun wieder erlaubt")
print(f"{'=' * 60}")
def get_shop_log_stats(shop):
"""Get log statistics for a single shop including user agents"""
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
log_file = os.path.join(httpdocs, LOG_FILE)
php_blocks = 0
ips = {} # ip -> {'count': N, 'ua': user_agent}
if os.path.isfile(log_file):
with open(log_file, 'r') as f:
for line in f:
php_blocks += 1
# Extract IP and User-Agent from log line
# Format: [timestamp] IP: x.x.x.x | UA: user_agent | URI: /path
ip = None
ua = 'Unknown'
if 'IP: ' in line:
try:
ip = line.split('IP: ')[1].split(' |')[0].strip()
except:
pass
if 'UA: ' in line:
try:
ua = line.split('UA: ')[1].split(' |')[0].strip()
except:
pass
if ip:
if ip not in ips:
ips[ip] = {'count': 0, 'ua': ua}
ips[ip]['count'] += 1
# Update UA if we have a better one (not Unknown)
if ua != 'Unknown' and ips[ip]['ua'] == 'Unknown':
ips[ip]['ua'] = ua
# Get activation time
activation_time = get_shop_activation_time(shop)
return php_blocks, ips, activation_time
def get_crowdsec_stats_by_shop():
"""Get CrowdSec decision counts grouped by shop"""
if not check_crowdsec():
return {}
stats = {}
code, stdout, _ = run_command("cscli decisions list -o raw --limit 0")
if code == 0 and stdout:
lines = stdout.strip().split('\n')
for line in lines[1:]: # Skip header
# Find shop name in reason field
for shop in get_active_shops():
if shop in line:
stats[shop] = stats.get(shop, 0) + 1
break
return stats
def show_all_logs():
"""Show combined logs for all active shops"""
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine aktiven Shops")
return
print(f"\n{'' * 70}")
print(" 📊 GESAMTÜBERSICHT ALLER SHOPS")
print(f"{'' * 70}")
print(f" {COLOR_GREEN}Grün = hinter Link11{COLOR_RESET} | {COLOR_RED}Rot = Direkt{COLOR_RESET}")
total_php_blocks = 0
shop_php_stats = {} # shop -> {'blocks': N, 'activation': datetime, 'req_min': float, 'ips': {}}
all_ips = {} # ip -> {'count': N, 'ua': user_agent, 'shops': {shop: count}}
total_minutes = 0
# Collect PHP stats
for shop in active_shops:
blocks, ips, activation_time = get_shop_log_stats(shop)
total_php_blocks += blocks
# Calculate runtime and req/min
if activation_time:
runtime_minutes = (datetime.now() - activation_time).total_seconds() / 60
req_min = blocks / runtime_minutes if runtime_minutes > 0 else 0
else:
runtime_minutes = 0
req_min = 0
shop_php_stats[shop] = {
'blocks': blocks,
'activation': activation_time,
'runtime_minutes': runtime_minutes,
'req_min': req_min,
'ips': ips # Store IPs per shop for top IP display
}
if runtime_minutes > total_minutes:
total_minutes = runtime_minutes
for ip, data in ips.items():
if ip not in all_ips:
all_ips[ip] = {'count': 0, 'ua': data['ua'], 'shops': {}}
all_ips[ip]['count'] += data['count']
all_ips[ip]['shops'][shop] = data['count']
# Keep the most informative UA
if data['ua'] != 'Unknown' and all_ips[ip]['ua'] == 'Unknown':
all_ips[ip]['ua'] = data['ua']
# Calculate total req/min
total_req_min = total_php_blocks / total_minutes if total_minutes > 0 else 0
# Get CrowdSec stats
crowdsec_stats = get_crowdsec_stats_by_shop()
total_crowdsec = sum(crowdsec_stats.values())
# Display PHP blocks with req/min and top IP per shop
print(f"\n📝 PHP-Blocks gesamt: {total_php_blocks} (⌀ {total_req_min:.1f} req/min, Laufzeit: {format_duration(total_minutes)})")
if shop_php_stats:
for shop in sorted(shop_php_stats.keys()):
stats = shop_php_stats[shop]
count = stats['blocks']
req_min = stats['req_min']
runtime = stats['runtime_minutes']
bar = "" * min(int(req_min * 2), 20) if req_min > 0 else ""
runtime_str = format_duration(runtime) if runtime > 0 else "?"
# Get geo region icon
geo_region = get_shop_geo_region(shop)
region_info = get_geo_region_info(geo_region)
geo_icon = region_info['icon']
# Color shop name based on Link11 status
link11_info = check_link11(shop)
if link11_info['is_link11']:
shop_colored = f"{COLOR_GREEN}{shop}{COLOR_RESET}"
else:
shop_colored = f"{COLOR_RED}{shop}{COLOR_RESET}"
print(f" ├─ {shop_colored} {geo_icon}: {count} ({req_min:.1f} req/min, seit {runtime_str}) {bar}")
# Show top IP for this shop
shop_ips = stats['ips']
if shop_ips and count > 0:
top_ip = max(shop_ips.items(), key=lambda x: x[1]['count'])
top_ip_addr = top_ip[0]
top_ip_count = top_ip[1]['count']
top_ip_ua = top_ip[1]['ua']
top_ip_bot = detect_bot(top_ip_ua)
top_ip_req_min = top_ip_count / runtime if runtime > 0 else 0
# Show bot name or shortened UA if unknown
if top_ip_bot == 'Unbekannt':
display_name = (top_ip_ua[:40] + '...') if len(top_ip_ua) > 43 else top_ip_ua
else:
display_name = top_ip_bot
print(f" │ └─➤ Top: {top_ip_addr} ({display_name}) - {top_ip_count}x, {top_ip_req_min:.1f} req/min")
# Display CrowdSec bans
print(f"\n🛡️ CrowdSec-Bans gesamt: {total_crowdsec}")
if crowdsec_stats:
for shop in sorted(crowdsec_stats.keys()):
count = crowdsec_stats[shop]
bar = "" * min(count // 10, 20) if count > 0 else ""
# Get geo region icon
geo_region = get_shop_geo_region(shop)
region_info = get_geo_region_info(geo_region)
geo_icon = region_info['icon']
# Color shop name based on Link11 status
link11_info = check_link11(shop)
if link11_info['is_link11']:
shop_colored = f"{COLOR_GREEN}{shop}{COLOR_RESET}"
else:
shop_colored = f"{COLOR_RED}{shop}{COLOR_RESET}"
print(f" ├─ {shop_colored} {geo_icon}: {count} {bar}")
elif check_crowdsec():
print(" └─ Keine aktiven Bans")
else:
print(" └─ CrowdSec nicht verfügbar")
# Top blocked IPs with bot detection, req/min, and top shop
if all_ips:
print(f"\n🔥 Top 100 blockierte IPs (alle Shops):")
sorted_ips = sorted(all_ips.items(), key=lambda x: x[1]['count'], reverse=True)[:100]
for ip, data in sorted_ips:
count = data['count']
ua = data['ua']
bot_name = detect_bot(ua)
shops_data = data['shops']
# Calculate req/min for this IP
ip_req_min = count / total_minutes if total_minutes > 0 else 0
# Find top shop for this IP
if shops_data:
top_shop = max(shops_data.items(), key=lambda x: x[1])
top_shop_name = top_shop[0]
top_shop_count = top_shop[1]
# Shorten shop name if too long
if len(top_shop_name) > 25:
top_shop_short = top_shop_name[:22] + '...'
else:
top_shop_short = top_shop_name
# Color shop name based on Link11 status
link11_info = check_link11(top_shop_name)
if link11_info['is_link11']:
top_shop_display = f"{COLOR_GREEN}{top_shop_short}{COLOR_RESET}"
else:
top_shop_display = f"{COLOR_RED}{top_shop_short}{COLOR_RESET}"
else:
top_shop_display = "?"
top_shop_count = 0
# Show bot name or shortened UA if unknown
if bot_name == 'Unbekannt':
display_name = (ua[:35] + '...') if len(ua) > 38 else ua
if display_name == 'Unknown':
display_name = 'Unbekannt'
else:
display_name = bot_name
bar = "" * min(count // 5, 20) if count > 0 else ""
print(f" {ip} ({display_name}): {count} ({ip_req_min:.1f} req/min) → {top_shop_display} [{top_shop_count}x] {bar}")
print(f"\n{'' * 70}")
# Wait for user
input("\nDrücke Enter um fortzufahren...")
def show_logs(shop):
"""Show logs for a single shop"""
httpdocs = os.path.join(VHOSTS_DIR, shop, 'httpdocs')
log_file = os.path.join(httpdocs, LOG_FILE)
shop_mode = get_shop_mode(shop)
shop_geo = get_shop_geo_region(shop)
region_info = get_geo_region_info(shop_geo)
# Get stats
blocks, ips, activation_time = get_shop_log_stats(shop)
# Calculate runtime and req/min
if activation_time:
runtime_minutes = (datetime.now() - activation_time).total_seconds() / 60
req_min = blocks / runtime_minutes if runtime_minutes > 0 else 0
runtime_str = format_duration(runtime_minutes)
activation_str = activation_time.strftime('%Y-%m-%d %H:%M:%S')
else:
runtime_minutes = 0
req_min = 0
runtime_str = "unbekannt"
activation_str = "unbekannt"
mode_display = "PHP + CrowdSec 🛡️" if shop_mode == "php+crowdsec" else "Nur PHP 📝"
print(f"\n{'' * 70}")
print(f"📊 Logs für {shop}")
print(f" {region_info['icon']} {region_info['name']} | {mode_display}")
print(f"{'' * 70}")
print(f"\n⏱️ Aktiviert: {activation_str}")
print(f"⏱️ Laufzeit: {runtime_str}")
print(f"📈 Blocks: {blocks} ({req_min:.1f} req/min)")
if os.path.isfile(log_file):
print(f"\n📝 Letzte 50 PHP-Blocks:")
print("=" * 70)
with open(log_file, 'r') as f:
lines = f.readlines()
for line in lines[-50:]:
print(line.rstrip())
print("=" * 70)
print(f"Gesamt: {len(lines)}")
# Show top IPs with bot detection and req/min
if ips:
print(f"\n🔥 Top 20 blockierte IPs:")
sorted_ips = sorted(ips.items(), key=lambda x: x[1]['count'], reverse=True)[:20]
for ip, data in sorted_ips:
count = data['count']
ua = data['ua']
bot_name = detect_bot(ua)
ip_req_min = count / runtime_minutes if runtime_minutes > 0 else 0
# Show bot name or shortened UA if unknown
if bot_name == 'Unbekannt':
display_name = (ua[:40] + '...') if len(ua) > 43 else ua
if display_name == 'Unknown':
display_name = 'Unbekannt'
else:
display_name = bot_name
bar = "" * min(count // 5, 20) if count > 0 else ""
print(f" {ip} ({display_name}): {count} ({ip_req_min:.1f} req/min) {bar}")
else:
print(f"\n Keine PHP-Logs für {shop}")
# Only show CrowdSec decisions if mode is php+crowdsec
if shop_mode == "php+crowdsec" and check_crowdsec():
print(f"\n🛡️ CrowdSec Decisions:")
print("=" * 70)
# Use raw output with --limit 0 (no pagination)
code, stdout, _ = run_command("cscli decisions list -o raw --limit 0")
if code == 0 and stdout:
lines = stdout.strip().split('\n')
shop_decisions = []
for line in lines[1:]: # Skip header
if shop in line:
shop_decisions.append(line)
if shop_decisions:
print(f"Aktive Bans: {len(shop_decisions)}")
print("\nLetzte 20 Bans:")
for line in shop_decisions[:20]:
parts = line.split(',')
if len(parts) > 8:
# Column 2: ip (format "Ip:1.2.3.4")
ip_field = parts[2].strip()
if ':' in ip_field:
ip = ip_field.split(':', 1)[1]
else:
ip = ip_field
# Column 8: expiration
expiry = parts[8].strip()
print(f" 🚫 {ip} (bis {expiry})")
if len(shop_decisions) > 20:
print(f" ... und {len(shop_decisions) - 20} weitere")
else:
print("Keine aktiven CrowdSec-Bans für diesen Shop")
else:
print("Konnte Decisions nicht abrufen")
print("=" * 70)
elif shop_mode == "php-only":
print(f"\n📝 CrowdSec-Synchronisation ist für diesen Shop deaktiviert (PHP-only Modus)")
def main():
"""Main menu"""
print("\n" + "=" * 60)
print(" GeoIP Shop Blocker Manager")
print(" Regionen: 🇩🇪🇦🇹🇨🇭 DACH | 🇪🇺 Eurozone+GB (22 Länder)")
print(" PHP + CrowdSec Watcher (systemd service)")
print("=" * 60)
if check_crowdsec():
print(" ✅ CrowdSec: Aktiv")
else:
print(" ⚠️ CrowdSec: Nicht verfügbar")
# Check service status
code, stdout, _ = run_command("systemctl is-active geoip-crowdsec-watcher.service")
if code == 0 and stdout.strip() == "active":
print(" ✅ Watcher-Service: Läuft")
else:
print(" ⚠️ Watcher-Service: Nicht aktiv")
while True:
print("\n" + "-" * 40)
print("[1] GeoIP-Blocking AKTIVIEREN (einzeln)")
print("[2] GeoIP-Blocking DEAKTIVIEREN (einzeln)")
print("[3] Logs anzeigen")
print("[4] Status anzeigen")
print("-" * 40)
print("[5] 🚀 ALLE Shops aktivieren")
print("[6] 🛑 ALLE Shops deaktivieren")
print("-" * 40)
print("[0] Beenden")
choice = input("\nWähle eine Option: ").strip()
if choice == "1":
shops = get_available_shops()
active_shops = get_active_shops()
available_shops = [s for s in shops if s not in active_shops]
if not available_shops:
print("\n⚠️ Keine Shops verfügbar")
continue
print("\n📋 Verfügbare Shops:")
print(f" {COLOR_GREEN}Grün = hinter Link11{COLOR_RESET} | {COLOR_RED}Rot = Direkt{COLOR_RESET}")
for i, shop in enumerate(available_shops, 1):
print(format_shop_with_link11(shop, show_index=i))
shop_choice = input("\nWähle einen Shop: ").strip()
try:
shop_idx = int(shop_choice) - 1
if 0 <= shop_idx < len(available_shops):
selected_shop = available_shops[shop_idx]
# Ask for geo region
geo_region = select_geo_region()
region_info = get_geo_region_info(geo_region)
# Ask for mode
mode = select_mode()
mode_display = "PHP + CrowdSec" if mode == "php+crowdsec" else "Nur PHP"
confirm = input(f"\n⚠️ {region_info['icon']} {region_info['name']}-Blocking ({mode_display}) aktivieren für '{selected_shop}'? (ja/nein): ").strip().lower()
if confirm in ['ja', 'j', 'yes', 'y']:
activate_blocking(selected_shop, mode=mode, geo_region=geo_region)
else:
print("❌ Ungültig")
except ValueError:
print("❌ Ungültig")
elif choice == "2":
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine aktiven Shops")
continue
print("\n📋 Aktive Shops:")
print(f" {COLOR_GREEN}Grün = hinter Link11{COLOR_RESET} | {COLOR_RED}Rot = Direkt{COLOR_RESET}")
for i, shop in enumerate(active_shops, 1):
mode = get_shop_mode(shop)
geo_region = get_shop_geo_region(shop)
region_info = get_geo_region_info(geo_region)
mode_icon = "🛡️" if mode == "php+crowdsec" else "📝"
link11_info = check_link11(shop)
color = COLOR_GREEN if link11_info['is_link11'] else COLOR_RED
link11_tag = "[Link11]" if link11_info['is_link11'] else "[Direkt]"
print(f" [{i}] {color}{shop} {link11_tag}{COLOR_RESET} {region_info['icon']} {mode_icon}")
shop_choice = input("\nWähle einen Shop: ").strip()
try:
shop_idx = int(shop_choice) - 1
if 0 <= shop_idx < len(active_shops):
selected_shop = active_shops[shop_idx]
confirm = input(f"\n⚠️ Deaktivieren für '{selected_shop}'? (ja/nein): ").strip().lower()
if confirm in ['ja', 'j', 'yes', 'y']:
deactivate_blocking(selected_shop)
else:
print("❌ Ungültig")
except ValueError:
print("❌ Ungültig")
elif choice == "3":
active_shops = get_active_shops()
if not active_shops:
print("\n⚠️ Keine aktiven Shops")
continue
print("\n📋 Logs anzeigen für:")
print(f" {COLOR_GREEN}Grün = hinter Link11{COLOR_RESET} | {COLOR_RED}Rot = Direkt{COLOR_RESET}")
print(f" [0] 📊 ALLE Shops (Zusammenfassung)")
for i, shop in enumerate(active_shops, 1):
mode = get_shop_mode(shop)
geo_region = get_shop_geo_region(shop)
region_info = get_geo_region_info(geo_region)
mode_icon = "🛡️" if mode == "php+crowdsec" else "📝"
link11_info = check_link11(shop)
color = COLOR_GREEN if link11_info['is_link11'] else COLOR_RED
link11_tag = "[Link11]" if link11_info['is_link11'] else "[Direkt]"
print(f" [{i}] {color}{shop} {link11_tag}{COLOR_RESET} {region_info['icon']} {mode_icon}")
shop_choice = input("\nWähle eine Option: ").strip()
try:
shop_idx = int(shop_choice)
if shop_idx == 0:
show_all_logs()
elif 1 <= shop_idx <= len(active_shops):
show_logs(active_shops[shop_idx - 1])
else:
print("❌ Ungültig")
except ValueError:
print("❌ Ungültig")
elif choice == "4":
shops = get_available_shops()
active_shops = get_active_shops()
print(f"\n📊 Status:")
print(f" Shops gesamt: {len(shops)}")
print(f" Aktive GeoIP-Blockings: {len(active_shops)}")
print(f" {COLOR_GREEN}Grün = hinter Link11{COLOR_RESET} | {COLOR_RED}Rot = Direkt{COLOR_RESET}")
if active_shops:
for shop in active_shops:
mode = get_shop_mode(shop)
geo_region = get_shop_geo_region(shop)
region_info = get_geo_region_info(geo_region)
mode_icon = "🛡️" if mode == "php+crowdsec" else "📝"
mode_text = "PHP+CS" if mode == "php+crowdsec" else "PHP"
# Get stats
blocks, _, activation_time = get_shop_log_stats(shop)
if activation_time:
runtime_minutes = (datetime.now() - activation_time).total_seconds() / 60
req_min = blocks / runtime_minutes if runtime_minutes > 0 else 0
runtime_str = format_duration(runtime_minutes)
else:
req_min = 0
runtime_str = "?"
# Link11 check
link11_info = check_link11(shop)
color = COLOR_GREEN if link11_info['is_link11'] else COLOR_RED
link11_tag = "[Link11]" if link11_info['is_link11'] else "[Direkt]"
print(f"{color}{shop} {link11_tag}{COLOR_RESET} {region_info['icon']} [{mode_text}] {mode_icon} - {blocks} blocks ({req_min:.1f} req/min, {runtime_str})")
elif choice == "5":
activate_all_shops()
elif choice == "6":
deactivate_all_shops()
elif choice == "0":
print("\n👋 Auf Wiedersehen!")
break
if __name__ == "__main__":
if os.geteuid() != 0:
print("❌ Als root ausführen!")
sys.exit(1)
try:
main()
except KeyboardInterrupt:
print("\n\n👋 Abgebrochen")
sys.exit(0)