#!/usr/bin/env python3
"""
JTL-WAFi Dashboard v2.5.0 - WebSocket Real-Time Dashboard
ÄNDERUNG v2.5: Country Rate-Limiting ersetzt GeoIP-Blocking!
- Bot Rate-Limiting: Bots nach Rate-Limit bannen
- Country Rate-Limiting: Länder nach Rate-Limit bannen
- Unlimitierte Länder: Definierte Länder werden nicht limitiert
- Monitor-Only Modus: Nur überwachen ohne zu blocken
- Alle Agent/Shop-Daten im Memory
- DB nur für: Passwort, Tokens, Sessions
v2.5.0: Country Rate-Limiting Feature
"""
import os
import sys
import json
import secrets
import hashlib
import asyncio
import subprocess
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any, List, Set
from dataclasses import dataclass, field
from collections import deque
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, HTTPException, Form
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from starlette.middleware.sessions import SessionMiddleware
import uvicorn
# =============================================================================
# VERSION & CONFIG
# =============================================================================
VERSION = "2.5.0"
DATA_DIR = "/var/lib/jtl-wafi"
SSL_DIR = "/var/lib/jtl-wafi/ssl"
SSL_CERT = "/var/lib/jtl-wafi/ssl/server.crt"
SSL_KEY = "/var/lib/jtl-wafi/ssl/server.key"
CONFIG_FILE = "/var/lib/jtl-wafi/config.json"
TOKENS_FILE = "/var/lib/jtl-wafi/tokens.json"
AGENT_TIMEOUT = 120
HISTORY_MAX_POINTS = 1000 # Max Datenpunkte pro Shop
SECRET_KEY = os.environ.get("DASHBOARD_SECRET", secrets.token_hex(32))
# =============================================================================
# UTILITY
# =============================================================================
def utc_now() -> datetime:
return datetime.now(timezone.utc)
def utc_now_str() -> str:
return utc_now().strftime("%Y-%m-%d %H:%M:%S")
def utc_now_iso() -> str:
return utc_now().strftime('%Y-%m-%dT%H:%M:%SZ')
# =============================================================================
# IN-MEMORY DATA STORE
# =============================================================================
@dataclass
class AgentData:
id: str
hostname: str
version: str = ""
os_info: Dict = field(default_factory=dict)
first_seen: str = ""
last_seen: str = ""
approved: bool = False
token: str = ""
status: str = "pending"
load_1m: float = 0.0
load_5m: float = 0.0
memory_percent: float = 0.0
uptime_seconds: int = 0
shops_total: int = 0
shops_active: int = 0
@dataclass
class ShopData:
"""Shop-Daten v2.5 mit Country-Rate-Limiting Support."""
domain: str
agent_id: str
agent_hostname: str = ""
status: str = "inactive"
link11: bool = False
link11_ip: str = ""
activated: str = ""
runtime_minutes: float = 0.0
# v2.5: Neue Konfiguration
bot_mode: bool = False
bot_rate_limit: int = 0
bot_ban_duration: int = 0
country_mode: bool = False
country_rate_limit: int = 0
country_ban_duration: int = 0
unlimited_countries: List[str] = field(default_factory=list)
monitor_only: bool = False
# Stats (v2.5 erweitert)
log_entries: int = 0
bot_bans: int = 0
country_bans: int = 0
active_bot_bans: int = 0
active_country_bans: int = 0
banned_bots: List[str] = field(default_factory=list)
banned_countries: List[str] = field(default_factory=list)
req_per_min: float = 0.0
unique_ips: int = 0
unique_bots: int = 0
unique_countries: int = 0
top_bots: Dict[str, int] = field(default_factory=dict)
top_ips: Dict[str, int] = field(default_factory=dict)
top_countries: Dict[str, int] = field(default_factory=dict)
human_requests: int = 0
bot_requests: int = 0
# History für Graph
history: deque = field(default_factory=lambda: deque(maxlen=HISTORY_MAX_POINTS))
bot_history: Dict[str, deque] = field(default_factory=dict)
country_history: Dict[str, deque] = field(default_factory=dict)
class DataStore:
"""In-Memory Datenspeicher - Thread-safe durch asyncio."""
def __init__(self):
self.agents: Dict[str, AgentData] = {}
self.shops: Dict[str, ShopData] = {}
self.sessions: Dict[str, Dict] = {} # token -> {username, expires}
self._password_hash: Optional[str] = None
self._tokens: Dict[str, str] = {} # agent_id -> token
self._load_persistent_data()
def _load_persistent_data(self):
"""Lädt persistente Daten (Passwort, Tokens)."""
os.makedirs(DATA_DIR, exist_ok=True)
# Config laden (Passwort)
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
self._password_hash = config.get('password_hash')
except:
pass
# Tokens laden
if os.path.exists(TOKENS_FILE):
try:
with open(TOKENS_FILE, 'r') as f:
self._tokens = json.load(f)
except:
pass
def _save_config(self):
"""Speichert Config."""
with open(CONFIG_FILE, 'w') as f:
json.dump({'password_hash': self._password_hash}, f)
def _save_tokens(self):
"""Speichert Tokens."""
with open(TOKENS_FILE, 'w') as f:
json.dump(self._tokens, f)
# === Password ===
def get_password_hash(self) -> Optional[str]:
return self._password_hash
def set_password(self, password: str):
self._password_hash = hashlib.sha256(password.encode()).hexdigest()
self._save_config()
def verify_password(self, password: str) -> bool:
if not self._password_hash:
return False
return hashlib.sha256(password.encode()).hexdigest() == self._password_hash
# === Sessions ===
def create_session(self, username: str) -> str:
token = secrets.token_hex(32)
expires = (utc_now() + timedelta(hours=24)).isoformat()
self.sessions[token] = {'username': username, 'expires': expires}
return token
def verify_session(self, token: str) -> Optional[str]:
if not token or token not in self.sessions:
return None
session = self.sessions[token]
expires = datetime.fromisoformat(session['expires'])
if utc_now() > expires:
del self.sessions[token]
return None
return session['username']
def delete_session(self, token: str):
self.sessions.pop(token, None)
# === Agent Tokens ===
def get_agent_token(self, agent_id: str) -> Optional[str]:
return self._tokens.get(agent_id)
def set_agent_token(self, agent_id: str, token: str):
self._tokens[agent_id] = token
self._save_tokens()
# === Agents ===
def get_or_create_agent(self, agent_id: str, hostname: str) -> AgentData:
if agent_id not in self.agents:
self.agents[agent_id] = AgentData(
id=agent_id,
hostname=hostname,
first_seen=utc_now_str()
)
# Prüfe ob Token existiert
if agent_id in self._tokens:
self.agents[agent_id].approved = True
self.agents[agent_id].token = self._tokens[agent_id]
self.agents[agent_id].status = 'online'
return self.agents[agent_id]
def get_agent(self, agent_id: str) -> Optional[AgentData]:
return self.agents.get(agent_id)
def get_all_agents(self) -> List[Dict]:
result = []
for agent in self.agents.values():
# Status prüfen
status = agent.status
if status == 'online' and agent.last_seen:
try:
last = datetime.strptime(agent.last_seen, "%Y-%m-%d %H:%M:%S")
if (utc_now().replace(tzinfo=None) - last).total_seconds() > AGENT_TIMEOUT:
status = 'offline'
except:
pass
result.append({
'id': agent.id,
'hostname': agent.hostname,
'version': agent.version,
'status': status,
'approved': agent.approved,
'first_seen': agent.first_seen,
'last_seen': agent.last_seen,
'load_1m': agent.load_1m,
'memory_percent': agent.memory_percent,
'shops_total': agent.shops_total,
'shops_active': agent.shops_active
})
return result
# === Shops ===
def update_shop(self, agent_id: str, agent_hostname: str, shop_data: Dict) -> ShopData:
"""Aktualisiert Shop-Daten vom Agent (v2.5)."""
domain = shop_data.get('domain')
if domain not in self.shops:
self.shops[domain] = ShopData(domain=domain, agent_id=agent_id)
shop = self.shops[domain]
shop.agent_id = agent_id
shop.agent_hostname = agent_hostname
shop.status = shop_data.get('status', 'inactive')
shop.link11 = bool(shop_data.get('link11'))
shop.link11_ip = shop_data.get('link11_ip', '')
shop.activated = shop_data.get('activated', '')
shop.runtime_minutes = shop_data.get('runtime_minutes', 0)
# v2.5 Konfiguration
shop.bot_mode = shop_data.get('bot_mode', False)
shop.bot_rate_limit = shop_data.get('bot_rate_limit') or 0
shop.bot_ban_duration = shop_data.get('bot_ban_duration') or 0
shop.country_mode = shop_data.get('country_mode', False)
shop.country_rate_limit = shop_data.get('country_rate_limit') or 0
shop.country_ban_duration = shop_data.get('country_ban_duration') or 0
shop.unlimited_countries = shop_data.get('unlimited_countries', [])
shop.monitor_only = shop_data.get('monitor_only', False)
# Stats (v2.5)
stats = shop_data.get('stats', {})
if stats:
shop.log_entries = stats.get('log_entries', 0)
shop.bot_bans = stats.get('bot_bans', 0)
shop.country_bans = stats.get('country_bans', 0)
shop.active_bot_bans = stats.get('active_bot_bans', 0)
shop.active_country_bans = stats.get('active_country_bans', 0)
shop.banned_bots = stats.get('banned_bots', [])
shop.banned_countries = stats.get('banned_countries', [])
shop.req_per_min = stats.get('req_per_min', 0)
shop.unique_ips = stats.get('unique_ips', 0)
shop.unique_bots = stats.get('unique_bots', 0)
shop.unique_countries = stats.get('unique_countries', 0)
shop.top_bots = stats.get('top_bots', {})
shop.top_ips = stats.get('top_ips', {})
shop.top_countries = stats.get('top_countries', {})
shop.human_requests = stats.get('human_requests', 0)
shop.bot_requests = stats.get('bot_requests', 0)
# History für Graph
shop.history.append({
'timestamp': utc_now_str(),
'req_per_min': shop.req_per_min,
'active_bot_bans': shop.active_bot_bans,
'active_country_bans': shop.active_country_bans,
'human_requests': shop.human_requests,
'bot_requests': shop.bot_requests
})
return shop
def update_shop_stats(self, domain: str, stats: Dict):
"""Aktualisiert Shop-Statistiken (v2.5)."""
if domain not in self.shops:
return
shop = self.shops[domain]
shop.log_entries = stats.get('log_entries', shop.log_entries)
shop.bot_bans = stats.get('bot_bans', shop.bot_bans)
shop.country_bans = stats.get('country_bans', shop.country_bans)
shop.active_bot_bans = stats.get('active_bot_bans', shop.active_bot_bans)
shop.active_country_bans = stats.get('active_country_bans', shop.active_country_bans)
shop.banned_bots = stats.get('banned_bots', shop.banned_bots)
shop.banned_countries = stats.get('banned_countries', shop.banned_countries)
shop.req_per_min = stats.get('req_per_min', shop.req_per_min)
shop.unique_ips = stats.get('unique_ips', shop.unique_ips)
shop.unique_bots = stats.get('unique_bots', shop.unique_bots)
shop.unique_countries = stats.get('unique_countries', shop.unique_countries)
shop.top_bots = stats.get('top_bots', shop.top_bots)
shop.top_ips = stats.get('top_ips', shop.top_ips)
shop.top_countries = stats.get('top_countries', shop.top_countries)
shop.human_requests = stats.get('human_requests', shop.human_requests)
shop.bot_requests = stats.get('bot_requests', shop.bot_requests)
timestamp = utc_now_str()
# Gesamt-History
shop.history.append({
'timestamp': timestamp,
'req_per_min': shop.req_per_min,
'active_bot_bans': shop.active_bot_bans,
'active_country_bans': shop.active_country_bans,
'human_requests': shop.human_requests,
'bot_requests': shop.bot_requests
})
# Bot-History aktualisieren
top_bots = stats.get('top_bots', {})
for bot_name, count in top_bots.items():
if bot_name not in shop.bot_history:
shop.bot_history[bot_name] = deque(maxlen=HISTORY_MAX_POINTS)
shop.bot_history[bot_name].append({
'timestamp': timestamp,
'count': count
})
# Country-History aktualisieren
top_countries = stats.get('top_countries', {})
for country, count in top_countries.items():
if country not in shop.country_history:
shop.country_history[country] = deque(maxlen=HISTORY_MAX_POINTS)
shop.country_history[country].append({
'timestamp': timestamp,
'count': count
})
def get_shop(self, domain: str) -> Optional[ShopData]:
return self.shops.get(domain)
def get_all_shops(self) -> List[Dict]:
"""Gibt alle Shops mit v2.5 Struktur zurück."""
result = []
for shop in self.shops.values():
result.append({
'domain': shop.domain,
'agent_id': shop.agent_id,
'agent_hostname': shop.agent_hostname,
'status': shop.status,
'link11': shop.link11,
'link11_ip': shop.link11_ip,
'activated': shop.activated,
'runtime_minutes': shop.runtime_minutes,
# v2.5 Konfiguration
'bot_mode': shop.bot_mode,
'bot_rate_limit': shop.bot_rate_limit,
'bot_ban_duration': shop.bot_ban_duration,
'country_mode': shop.country_mode,
'country_rate_limit': shop.country_rate_limit,
'country_ban_duration': shop.country_ban_duration,
'unlimited_countries': shop.unlimited_countries,
'monitor_only': shop.monitor_only,
# Stats
'stats': {
'log_entries': shop.log_entries,
'bot_bans': shop.bot_bans,
'country_bans': shop.country_bans,
'active_bot_bans': shop.active_bot_bans,
'active_country_bans': shop.active_country_bans,
'banned_bots': shop.banned_bots,
'banned_countries': shop.banned_countries,
'req_per_min': shop.req_per_min,
'unique_ips': shop.unique_ips,
'unique_bots': shop.unique_bots,
'unique_countries': shop.unique_countries,
'top_bots': shop.top_bots,
'top_ips': shop.top_ips,
'top_countries': shop.top_countries,
'human_requests': shop.human_requests,
'bot_requests': shop.bot_requests
}
})
return result
def get_shop_history(self, domain: str) -> Dict:
"""Gibt Shop-History inkl. Country-History zurück."""
shop = self.shops.get(domain)
if not shop:
return {'history': [], 'bot_history': {}, 'country_history': {}}
# Bot-History in JSON-serialisierbares Format
bot_history = {}
for bot_name, history in shop.bot_history.items():
bot_history[bot_name] = list(history)
# Country-History in JSON-serialisierbares Format
country_history = {}
for country, history in shop.country_history.items():
country_history[country] = list(history)
return {
'history': list(shop.history),
'bot_history': bot_history,
'country_history': country_history
}
def get_top_shops(self, limit: int = 10, sort_by: str = 'req_per_min') -> List[Dict]:
"""Gibt Top Shops sortiert nach verschiedenen Kriterien zurück."""
shops_list = []
for shop in self.shops.values():
shops_list.append({
'domain': shop.domain,
'agent_hostname': shop.agent_hostname,
'status': shop.status,
'req_per_min': shop.req_per_min,
'active_bot_bans': shop.active_bot_bans,
'active_country_bans': shop.active_country_bans,
'link11': shop.link11,
'bot_mode': shop.bot_mode,
'country_mode': shop.country_mode,
'monitor_only': shop.monitor_only
})
# Sortieren
if sort_by == 'active_bans':
shops_list.sort(key=lambda x: x['active_bot_bans'] + x['active_country_bans'], reverse=True)
else:
shops_list.sort(key=lambda x: x['req_per_min'], reverse=True)
if limit:
return shops_list[:limit]
return shops_list
def get_stats(self) -> Dict:
agents_online = sum(1 for a in self.agents.values()
if a.approved and a.status == 'online')
agents_pending = sum(1 for a in self.agents.values() if not a.approved)
shops_active = sum(1 for s in self.shops.values() if s.status == 'active')
shops_total = len(self.shops)
shops_link11 = sum(1 for s in self.shops.values() if s.link11)
shops_direct = shops_total - shops_link11
req_per_min = sum(s.req_per_min for s in self.shops.values())
active_bot_bans = sum(s.active_bot_bans for s in self.shops.values())
active_country_bans = sum(s.active_country_bans for s in self.shops.values())
return {
'agents_online': agents_online,
'agents_pending': agents_pending,
'shops_active': shops_active,
'shops_total': shops_total,
'shops_link11': shops_link11,
'shops_direct': shops_direct,
'req_per_min': round(req_per_min, 1),
'active_bot_bans': active_bot_bans,
'active_country_bans': active_country_bans,
'active_bans': active_bot_bans + active_country_bans
}
# Global Data Store
store = DataStore()
# =============================================================================
# SSL
# =============================================================================
def generate_ssl_certificate():
os.makedirs(SSL_DIR, exist_ok=True)
if os.path.exists(SSL_CERT) and os.path.exists(SSL_KEY):
return
print("🔐 Generiere SSL-Zertifikat...")
try:
subprocess.run([
'openssl', 'req', '-x509', '-nodes',
'-days', '3650',
'-newkey', 'rsa:2048',
'-keyout', SSL_KEY,
'-out', SSL_CERT,
'-subj', '/CN=jtl-wafi/O=JTL-WAFi/C=DE'
], check=True, capture_output=True)
os.chmod(SSL_KEY, 0o600)
os.chmod(SSL_CERT, 0o644)
print(f"✅ SSL-Zertifikat generiert: {SSL_CERT}")
except Exception as e:
print(f"❌ SSL Fehler: {e}")
raise
# =============================================================================
# CONNECTION MANAGER
# =============================================================================
class ConnectionManager:
def __init__(self):
self.agent_connections: Dict[str, WebSocket] = {}
self.browser_connections: Set[WebSocket] = set()
self.agent_hostnames: Dict[str, str] = {}
async def connect_agent(self, agent_id: str, hostname: str, websocket: WebSocket):
# Alte Verbindung schließen
if agent_id in self.agent_connections:
try:
await self.agent_connections[agent_id].close()
except:
pass
self.agent_connections[agent_id] = websocket
self.agent_hostnames[agent_id] = hostname
print(f"✅ Agent verbunden: {hostname}")
async def disconnect_agent(self, agent_id: str):
self.agent_connections.pop(agent_id, None)
hostname = self.agent_hostnames.pop(agent_id, "unknown")
# Status updaten
agent = store.get_agent(agent_id)
if agent:
agent.status = 'offline'
agent.last_seen = utc_now_str()
print(f"❌ Agent getrennt: {hostname}")
await self.broadcast_to_browsers({
'type': 'agent.offline',
'data': {'agent_id': agent_id, 'hostname': hostname}
})
async def connect_browser(self, websocket: WebSocket):
self.browser_connections.add(websocket)
print(f"🌐 Browser verbunden (Total: {len(self.browser_connections)})")
async def disconnect_browser(self, websocket: WebSocket):
self.browser_connections.discard(websocket)
print(f"🌐 Browser getrennt (Total: {len(self.browser_connections)})")
async def send_to_agent(self, agent_id: str, message: Dict):
ws = self.agent_connections.get(agent_id)
if ws:
try:
await ws.send_json(message)
except Exception as e:
print(f"Send to agent error: {e}")
async def broadcast_to_browsers(self, message: Dict):
dead = set()
for ws in self.browser_connections:
try:
await ws.send_json(message)
except:
dead.add(ws)
self.browser_connections -= dead
def get_agent_for_shop(self, domain: str) -> Optional[str]:
shop = store.get_shop(domain)
return shop.agent_id if shop else None
def is_agent_connected(self, agent_id: str) -> bool:
return agent_id in self.agent_connections
manager = ConnectionManager()
# =============================================================================
# FASTAPI APP
# =============================================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
generate_ssl_certificate()
yield
app = FastAPI(title="JTL-WAFi Dashboard", version=VERSION, lifespan=lifespan)
app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY, session_cookie="jtl_wafi_session", max_age=86400)
# =============================================================================
# AUTH HELPERS
# =============================================================================
async def get_current_user(request: Request) -> Optional[str]:
token = request.session.get("token")
return store.verify_session(token)
# =============================================================================
# WEBSOCKET: AGENT
# =============================================================================
@app.websocket("/ws/agent")
async def agent_websocket(websocket: WebSocket):
await websocket.accept()
agent_id = None
try:
async for message in websocket.iter_text():
try:
data = json.loads(message)
event_type = data.get('type')
event_data = data.get('data', {})
if event_type == 'agent.connect':
agent_id = event_data.get('agent_id')
hostname = event_data.get('hostname')
token = event_data.get('token')
version = event_data.get('version', '')
os_info = event_data.get('os_info', {})
shops_summary = event_data.get('shops_summary', {})
# Agent registrieren
agent = store.get_or_create_agent(agent_id, hostname)
agent.hostname = hostname
agent.version = version
agent.os_info = os_info
agent.last_seen = utc_now_str()
agent.shops_total = shops_summary.get('total', 0)
agent.shops_active = shops_summary.get('active', 0)
# Token prüfen
stored_token = store.get_agent_token(agent_id)
if stored_token and token == stored_token:
agent.approved = True
agent.token = stored_token
agent.status = 'online'
await manager.connect_agent(agent_id, hostname, websocket)
# Browser informieren
await manager.broadcast_to_browsers({
'type': 'agent.online' if agent.approved else 'agent.pending',
'data': {
'agent_id': agent_id,
'hostname': hostname,
'version': version,
'status': agent.status,
'approved': agent.approved,
'shops_total': shops_summary.get('total', 0),
'shops_active': shops_summary.get('active', 0)
}
})
# Token senden wenn approved
if agent.approved:
await websocket.send_json({
'type': 'auth.approved',
'data': {'token': agent.token}
})
elif event_type == 'agent.heartbeat':
if agent_id:
agent = store.get_agent(agent_id)
if agent:
system = event_data.get('system', {})
shops_summary = event_data.get('shops_summary', {})
agent.last_seen = utc_now_str()
agent.load_1m = system.get('load_1m', 0)
agent.load_5m = system.get('load_5m', 0)
agent.memory_percent = system.get('memory_percent', 0)
agent.uptime_seconds = system.get('uptime_seconds', 0)
agent.shops_total = shops_summary.get('total', 0)
agent.shops_active = shops_summary.get('active', 0)
await manager.broadcast_to_browsers({
'type': 'agent.update',
'data': {
'agent_id': agent_id,
'hostname': agent.hostname,
'system': system,
'shops_summary': shops_summary
}
})
elif event_type == 'shop.full_update':
if agent_id:
agent = store.get_agent(agent_id)
hostname = agent.hostname if agent else ''
shops = event_data.get('shops', [])
for shop_data in shops:
store.update_shop(agent_id, hostname, shop_data)
await manager.broadcast_to_browsers({
'type': 'shop.full_update',
'data': {
'agent_id': agent_id,
'hostname': hostname,
'shops': shops
}
})
elif event_type == 'shop.stats':
if agent_id:
domain = event_data.get('domain')
stats = event_data.get('stats', {})
store.update_shop_stats(domain, stats)
await manager.broadcast_to_browsers({
'type': 'shop.stats',
'data': {'domain': domain, 'stats': stats}
})
elif event_type == 'log.entry':
await manager.broadcast_to_browsers({
'type': 'log.entry',
'data': event_data
})
elif event_type == 'bot.banned':
await manager.broadcast_to_browsers({
'type': 'bot.banned',
'data': event_data
})
elif event_type == 'command.result':
await manager.broadcast_to_browsers({
'type': 'command.result',
'data': event_data
})
except json.JSONDecodeError:
pass
except Exception as e:
print(f"Agent message error: {e}")
except WebSocketDisconnect:
pass
except Exception as e:
print(f"Agent WebSocket error: {e}")
finally:
if agent_id:
await manager.disconnect_agent(agent_id)
# =============================================================================
# WEBSOCKET: BROWSER
# =============================================================================
@app.websocket("/ws/dashboard")
async def dashboard_websocket(websocket: WebSocket):
await websocket.accept()
await manager.connect_browser(websocket)
try:
# Initial state senden
await websocket.send_json({
'type': 'initial_state',
'data': {
'agents': store.get_all_agents(),
'shops': store.get_all_shops(),
'stats': store.get_stats()
}
})
async for message in websocket.iter_text():
try:
data = json.loads(message)
event_type = data.get('type')
event_data = data.get('data', {})
if event_type == 'log.subscribe':
domain = event_data.get('shop')
agent_id = manager.get_agent_for_shop(domain)
if agent_id:
await manager.send_to_agent(agent_id, {
'type': 'log.subscribe',
'data': {'shop': domain}
})
elif event_type == 'log.unsubscribe':
domain = event_data.get('shop')
agent_id = manager.get_agent_for_shop(domain)
if agent_id:
await manager.send_to_agent(agent_id, {
'type': 'log.unsubscribe',
'data': {'shop': domain}
})
elif event_type == 'get_shop_history':
domain = event_data.get('domain')
data = store.get_shop_history(domain)
await websocket.send_json({
'type': 'shop_history',
'data': {'domain': domain, **data}
})
elif event_type == 'get_top_shops':
sort_by = event_data.get('sort_by', 'req_per_min')
limit = event_data.get('limit', 10)
shops = store.get_top_shops(limit=limit, sort_by=sort_by)
await websocket.send_json({
'type': 'top_shops',
'data': {'shops': shops, 'sort_by': sort_by}
})
elif event_type == 'get_all_shops_sorted':
sort_by = event_data.get('sort_by', 'req_per_min')
shops = store.get_top_shops(limit=None, sort_by=sort_by)
await websocket.send_json({
'type': 'all_shops_sorted',
'data': {'shops': shops, 'sort_by': sort_by}
})
elif event_type == 'refresh':
await websocket.send_json({
'type': 'refresh',
'data': {
'agents': store.get_all_agents(),
'shops': store.get_all_shops(),
'stats': store.get_stats()
}
})
except Exception as e:
print(f"Browser message error: {e}")
except WebSocketDisconnect:
pass
except Exception as e:
print(f"Browser WebSocket error: {e}")
finally:
await manager.disconnect_browser(websocket)
# =============================================================================
# HTTP ENDPOINTS
# =============================================================================
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
user = await get_current_user(request)
if not store.get_password_hash():
return get_setup_html()
if not user:
return get_login_html()
return get_dashboard_html()
@app.post("/setup")
async def setup(request: Request, password: str = Form(...), confirm: str = Form(...)):
if store.get_password_hash():
raise HTTPException(400, "Passwort bereits gesetzt")
if password != confirm:
return HTMLResponse(get_setup_html("Passwörter stimmen nicht überein"))
if len(password) < 8:
return HTMLResponse(get_setup_html("Passwort muss mindestens 8 Zeichen haben"))
store.set_password(password)
token = store.create_session("admin")
response = RedirectResponse(url="/", status_code=303)
request.session["token"] = token
return response
@app.post("/login")
async def login(request: Request, password: str = Form(...)):
if not store.verify_password(password):
return HTMLResponse(get_login_html("Falsches Passwort"))
token = store.create_session("admin")
response = RedirectResponse(url="/", status_code=303)
request.session["token"] = token
return response
@app.get("/logout")
async def logout(request: Request):
token = request.session.get("token")
if token:
store.delete_session(token)
request.session.clear()
return RedirectResponse(url="/")
@app.post("/api/agents/{agent_id}/approve")
async def approve_agent(agent_id: str, request: Request):
user = await get_current_user(request)
if not user:
raise HTTPException(401)
agent = store.get_agent(agent_id)
if not agent:
raise HTTPException(404, "Agent nicht gefunden")
# Token generieren
token = secrets.token_hex(32)
agent.approved = True
agent.token = token
agent.status = 'online'
store.set_agent_token(agent_id, token)
# Token an Agent senden
if manager.is_agent_connected(agent_id):
await manager.send_to_agent(agent_id, {
'type': 'auth.approved',
'data': {'token': token}
})
await manager.broadcast_to_browsers({
'type': 'agent.approved',
'data': {'agent_id': agent_id}
})
return {"success": True}
@app.post("/api/shops/activate")
async def activate_shop(
request: Request,
domain: str = Form(...),
bot_mode: str = Form("true"),
bot_rate_limit: int = Form(30),
bot_ban_duration: int = Form(300),
country_mode: str = Form("false"),
country_rate_limit: int = Form(100),
country_ban_duration: int = Form(600),
unlimited_countries: str = Form(""),
monitor_only: str = Form("false")
):
"""Aktiviert Blocking für einen Shop (v2.5)."""
user = await get_current_user(request)
if not user:
raise HTTPException(401)
# String zu Boolean konvertieren
is_bot_mode = bot_mode.lower() in ('true', '1', 'yes', 'on')
is_country_mode = country_mode.lower() in ('true', '1', 'yes', 'on')
is_monitor_only = monitor_only.lower() in ('true', '1', 'yes', 'on')
# unlimited_countries als Liste parsen
countries_list = []
if unlimited_countries:
countries_list = [c.strip().lower() for c in unlimited_countries.split(',') if c.strip()]
agent_id = manager.get_agent_for_shop(domain)
if not agent_id or not manager.is_agent_connected(agent_id):
return JSONResponse({"success": False, "error": "Agent nicht verbunden"})
command_id = secrets.token_hex(8)
await manager.send_to_agent(agent_id, {
'type': 'command.activate',
'data': {
'command_id': command_id,
'shop': domain,
'bot_mode': is_bot_mode and not is_monitor_only,
'bot_rate_limit': bot_rate_limit if is_bot_mode else None,
'bot_ban_duration': bot_ban_duration if is_bot_mode else None,
'country_mode': is_country_mode and not is_monitor_only,
'country_rate_limit': country_rate_limit if is_country_mode else None,
'country_ban_duration': country_ban_duration if is_country_mode else None,
'unlimited_countries': countries_list if is_country_mode else [],
'monitor_only': is_monitor_only
}
})
return {"success": True, "command_id": command_id}
@app.post("/api/shops/deactivate")
async def deactivate_shop(request: Request, domain: str = Form(...)):
user = await get_current_user(request)
if not user:
raise HTTPException(401)
agent_id = manager.get_agent_for_shop(domain)
if not agent_id or not manager.is_agent_connected(agent_id):
return JSONResponse({"success": False, "error": "Agent nicht verbunden"})
command_id = secrets.token_hex(8)
await manager.send_to_agent(agent_id, {
'type': 'command.deactivate',
'data': {'command_id': command_id, 'shop': domain}
})
return {"success": True, "command_id": command_id}
@app.post("/api/shops/bulk-activate")
async def bulk_activate(
request: Request,
bot_mode: str = Form("true"),
bot_rate_limit: int = Form(30),
bot_ban_duration: int = Form(300),
country_mode: str = Form("false"),
country_rate_limit: int = Form(100),
country_ban_duration: int = Form(600),
unlimited_countries: str = Form(""),
monitor_only: str = Form("false"),
filter_type: str = Form("all")
):
"""Bulk-Aktivierung für mehrere Shops (v2.5)."""
user = await get_current_user(request)
if not user:
raise HTTPException(401)
# String zu Boolean konvertieren
is_bot_mode = bot_mode.lower() in ('true', '1', 'yes', 'on')
is_country_mode = country_mode.lower() in ('true', '1', 'yes', 'on')
is_monitor_only = monitor_only.lower() in ('true', '1', 'yes', 'on')
# unlimited_countries als Liste parsen
countries_list = []
if unlimited_countries:
countries_list = [c.strip().lower() for c in unlimited_countries.split(',') if c.strip()]
activated = 0
shops = store.get_all_shops()
for shop in shops:
if shop['status'] == 'active':
continue
if filter_type == 'direct' and shop['link11']:
continue
if filter_type == 'link11' and not shop['link11']:
continue
agent_id = shop.get('agent_id')
if not agent_id or not manager.is_agent_connected(agent_id):
continue
command_id = secrets.token_hex(8)
await manager.send_to_agent(agent_id, {
'type': 'command.activate',
'data': {
'command_id': command_id,
'shop': shop['domain'],
'bot_mode': is_bot_mode and not is_monitor_only,
'bot_rate_limit': bot_rate_limit if is_bot_mode else None,
'bot_ban_duration': bot_ban_duration if is_bot_mode else None,
'country_mode': is_country_mode and not is_monitor_only,
'country_rate_limit': country_rate_limit if is_country_mode else None,
'country_ban_duration': country_ban_duration if is_country_mode else None,
'unlimited_countries': countries_list if is_country_mode else [],
'monitor_only': is_monitor_only
}
})
activated += 1
# Kleine Pause um nicht zu überlasten
if activated % 5 == 0:
await asyncio.sleep(0.1)
return {"success": True, "activated": activated}
@app.post("/api/shops/bulk-deactivate")
async def bulk_deactivate(request: Request, filter_type: str = Form("all")):
user = await get_current_user(request)
if not user:
raise HTTPException(401)
deactivated = 0
shops = store.get_all_shops()
for shop in shops:
if shop['status'] != 'active':
continue
if filter_type == 'direct' and shop['link11']:
continue
if filter_type == 'link11' and not shop['link11']:
continue
agent_id = shop.get('agent_id')
if not agent_id or not manager.is_agent_connected(agent_id):
continue
command_id = secrets.token_hex(8)
await manager.send_to_agent(agent_id, {
'type': 'command.deactivate',
'data': {'command_id': command_id, 'shop': shop['domain']}
})
deactivated += 1
if deactivated % 5 == 0:
await asyncio.sleep(0.1)
return {"success": True, "deactivated": deactivated}
@app.post("/api/change-password")
async def change_password(
request: Request,
current: str = Form(...),
new_pw: str = Form(...),
confirm: str = Form(...)
):
user = await get_current_user(request)
if not user:
raise HTTPException(401)
if not store.verify_password(current):
return {"success": False, "error": "Aktuelles Passwort falsch"}
if new_pw != confirm:
return {"success": False, "error": "Neue Passwörter stimmen nicht überein"}
if len(new_pw) < 8:
return {"success": False, "error": "Mindestens 8 Zeichen"}
store.set_password(new_pw)
return {"success": True}
@app.get("/api/shop/{domain}/history")
async def get_shop_history_api(domain: str, request: Request):
user = await get_current_user(request)
if not user:
raise HTTPException(401)
data = store.get_shop_history(domain)
return {"domain": domain, **data}
# =============================================================================
# HTML TEMPLATES
# =============================================================================
def get_setup_html(error: str = None) -> str:
error_html = f'
{error}
' if error else ''
return f'''
JTL-WAFi Dashboard - Setup
🔐 Setup
Erstelle ein Admin-Passwort
{error_html}
'''
def get_login_html(error: str = None) -> str:
error_html = f'{error}
' if error else ''
return f'''
JTL-WAFi Dashboard - Login
🌍 JTL-WAFi Dashboard
{error_html}
'''
def get_dashboard_html() -> str:
return '''
JTL-WAFi Dashboard v2.5
⚡ Massenaktionen:
| Status⇅ | Hostname⇅ | Shops⇅ | Load⇅ | Memory⇅ | Zuletzt⇅ | Aktionen |
|---|
| Status⇅ | Domain⇅ | Server⇅ | Modus⇅ | Req/min⇅ | Bans⇅ | Laufzeit⇅ | Aktionen |
|---|
| Status⇅ | Domain⇅ | Server⇅ | Modus⇅ | Req/min⇅ | Bans⇅ | Laufzeit⇅ | Aktionen |
|---|
⚡ Massenaktivierung (v2.5)
📊 Alle Shops
| # | Domain | Server | Status | Req/min | Bans | Typ |
|---|
Server: -
⚙️ Steuerung
Wechseln zu: 📈 Bot-Aktivität über Zeit
'''
def create_systemd_service():
service = """[Unit]
Description=JTL-WAFi Dashboard v2.5 (WebSocket)
After=network.target
[Service]
Type=simple
ExecStart=/usr/bin/python3 /opt/jtl-wafi/dashboard.py
Restart=always
RestartSec=10
User=root
Environment=PYTHONUNBUFFERED=1
[Install]
WantedBy=multi-user.target
"""
with open("/etc/systemd/system/jtl-wafi.service", 'w') as f:
f.write(service)
print("✅ Service erstellt")
print(" systemctl daemon-reload && systemctl enable --now jtl-wafi")
def main():
import argparse
parser = argparse.ArgumentParser(description=f"JTL-WAFi Dashboard v{VERSION}")
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8000)
parser.add_argument("--no-ssl", action="store_true")
parser.add_argument("--install-service", action="store_true")
args = parser.parse_args()
if args.install_service:
create_systemd_service()
return
os.makedirs(DATA_DIR, exist_ok=True)
print("=" * 60)
print(f"🌍 JTL-WAFi Dashboard v{VERSION} (In-Memory)")
print("=" * 60)
print(f"Host: {args.host}:{args.port}")
print(f"SSL: {'Nein' if args.no_ssl else 'Ja'}")
print("=" * 60)
ssl_config = {}
if not args.no_ssl:
generate_ssl_certificate()
ssl_config = {"ssl_certfile": SSL_CERT, "ssl_keyfile": SSL_KEY}
uvicorn.run(app, host=args.host, port=args.port, **ssl_config, log_level="info")
if __name__ == "__main__":
main()