#!/usr/bin/env python3 """ Mail-Alarm - IMAP IDLE E-Mail Überwachung mit Pushover Emergency Alerts """ import os import sys import time import signal import logging import configparser import subprocess from pathlib import Path # Konstanten CONFIG_DIR = Path("/etc/mail-alarm") CONFIG_FILE = CONFIG_DIR / "config.ini" LOG_FILE = Path("/var/log/mail-alarm.log") SERVICE_NAME = "mail-alarm" SERVICE_FILE = Path(f"/etc/systemd/system/{SERVICE_NAME}.service") SCRIPT_PATH = Path("/usr/local/bin/mail-alarm.py") # Logging Setup def setup_logging(daemon_mode=False): """Konfiguriert das Logging.""" log_format = "%(asctime)s - %(levelname)s - %(message)s" if daemon_mode: logging.basicConfig( level=logging.INFO, format=log_format, handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) else: logging.basicConfig( level=logging.INFO, format=log_format, handlers=[logging.StreamHandler()] ) def check_root(): """Prüft ob das Script als root läuft.""" if os.geteuid() != 0: print("❌ Dieses Script muss als root ausgeführt werden!") print(" Bitte mit 'sudo python3 mail-alarm.py' starten.") sys.exit(1) def install_dependencies(): """Installiert Python-Abhängigkeiten via apt.""" print("\n📦 Installiere Abhängigkeiten...") packages = ["python3-imapclient", "python3-requests"] try: subprocess.run( ["apt-get", "update"], check=True, capture_output=True ) subprocess.run( ["apt-get", "install", "-y"] + packages, check=True, capture_output=True ) print("✅ Abhängigkeiten installiert") return True except subprocess.CalledProcessError as e: print(f"❌ Fehler bei Installation: {e}") return False def get_user_input(prompt, default=None, password=False): """Holt Benutzereingabe mit optionalem Default-Wert.""" if default: prompt = f"{prompt} [{default}]: " else: prompt = f"{prompt}: " if password: import getpass value = getpass.getpass(prompt) else: value = input(prompt) return value.strip() if value.strip() else default def create_config(): """Erstellt die Konfigurationsdatei interaktiv.""" print("\n⚙️ Konfiguration erstellen") print("-" * 40) print("\n📧 IMAP Einstellungen:") imap_server = get_user_input(" IMAP Server", "imap.jtl-software.de") imap_port = get_user_input(" IMAP Port", "993") imap_user = get_user_input(" Benutzername/E-Mail") imap_pass = get_user_input(" Passwort", password=True) imap_folder = get_user_input(" Ordner", "INBOX") print("\n📱 Pushover Einstellungen:") pushover_token = get_user_input(" App Token") pushover_user = get_user_input(" User Key") pushover_sound = get_user_input(" Sound (leer für Default)", "") print("\n⏱️ Alarm Einstellungen:") retry = get_user_input(" Retry Intervall (Sekunden)", "30") expire = get_user_input(" Expire Zeit (Sekunden)", "3600") # Config erstellen config = configparser.ConfigParser() config["IMAP"] = { "server": imap_server, "port": imap_port, "username": imap_user, "password": imap_pass, "folder": imap_folder } config["PUSHOVER"] = { "token": pushover_token, "user": pushover_user, "sound": pushover_sound } config["ALARM"] = { "retry": retry, "expire": expire, "title": "Rufbereitschaft", "message": "ALARM: Neue E-Mail im Rufbereitschafts-Postfach!" } # Verzeichnis erstellen und Config speichern CONFIG_DIR.mkdir(parents=True, exist_ok=True) with open(CONFIG_FILE, "w") as f: config.write(f) # Nur root darf die Config lesen (enthält Passwörter) os.chmod(CONFIG_FILE, 0o600) print(f"\n✅ Konfiguration gespeichert in {CONFIG_FILE}") return True def load_config(): """Lädt die Konfiguration.""" if not CONFIG_FILE.exists(): return None config = configparser.ConfigParser() config.read(CONFIG_FILE) return config def create_systemd_service(): """Erstellt den systemd Service.""" service_content = f"""[Unit] Description=Mail-Alarm IMAP Überwachung After=network-online.target Wants=network-online.target [Service] Type=simple ExecStart=/usr/bin/python3 {SCRIPT_PATH} --daemon Restart=always RestartSec=10 StandardOutput=append:{LOG_FILE} StandardError=append:{LOG_FILE} [Install] WantedBy=multi-user.target """ with open(SERVICE_FILE, "w") as f: f.write(service_content) print(f"✅ Systemd Service erstellt: {SERVICE_FILE}") def install(): """Führt die Installation durch.""" print("\n" + "=" * 50) print("📥 INSTALLATION") print("=" * 50) # Abhängigkeiten installieren if not install_dependencies(): return False # Script kopieren print(f"\n📄 Kopiere Script nach {SCRIPT_PATH}...") current_script = Path(__file__).resolve() subprocess.run(["cp", str(current_script), str(SCRIPT_PATH)], check=True) os.chmod(SCRIPT_PATH, 0o755) print("✅ Script kopiert") # Log-Datei erstellen LOG_FILE.touch(exist_ok=True) os.chmod(LOG_FILE, 0o644) # Konfiguration erstellen if not create_config(): return False # Systemd Service erstellen print("\n🔧 Erstelle Systemd Service...") create_systemd_service() # Service aktivieren und starten print("\n🚀 Starte Service...") subprocess.run(["systemctl", "daemon-reload"], check=True) subprocess.run(["systemctl", "enable", SERVICE_NAME], check=True) subprocess.run(["systemctl", "start", SERVICE_NAME], check=True) print("\n" + "=" * 50) print("✅ INSTALLATION ABGESCHLOSSEN") print("=" * 50) print(f"\n📋 Nützliche Befehle:") print(f" Status: systemctl status {SERVICE_NAME}") print(f" Logs: journalctl -u {SERVICE_NAME} -f") print(f" Log-Datei: tail -f {LOG_FILE}") print(f" Stoppen: systemctl stop {SERVICE_NAME}") print(f" Starten: systemctl start {SERVICE_NAME}") return True def uninstall(): """Führt die Deinstallation durch.""" print("\n" + "=" * 50) print("📤 DEINSTALLATION") print("=" * 50) # Service stoppen und deaktivieren print("\n🛑 Stoppe Service...") subprocess.run(["systemctl", "stop", SERVICE_NAME], capture_output=True) subprocess.run(["systemctl", "disable", SERVICE_NAME], capture_output=True) # Service-Datei entfernen if SERVICE_FILE.exists(): SERVICE_FILE.unlink() print("✅ Systemd Service entfernt") subprocess.run(["systemctl", "daemon-reload"], check=True) # Script entfernen if SCRIPT_PATH.exists(): SCRIPT_PATH.unlink() print("✅ Script entfernt") # Nachfragen ob Config und Logs gelöscht werden sollen delete_config = input("\n🗑️ Konfiguration löschen? (j/N): ").lower() == "j" if delete_config and CONFIG_DIR.exists(): import shutil shutil.rmtree(CONFIG_DIR) print("✅ Konfiguration gelöscht") delete_logs = input("🗑️ Log-Datei löschen? (j/N): ").lower() == "j" if delete_logs and LOG_FILE.exists(): LOG_FILE.unlink() print("✅ Log-Datei gelöscht") print("\n" + "=" * 50) print("✅ DEINSTALLATION ABGESCHLOSSEN") print("=" * 50) return True def show_status(): """Zeigt den Status des Services an.""" print("\n" + "=" * 50) print("📊 STATUS") print("=" * 50) # Service Status result = subprocess.run( ["systemctl", "is-active", SERVICE_NAME], capture_output=True, text=True ) status = result.stdout.strip() if status == "active": print(f"\n🟢 Service: Läuft") else: print(f"\n🔴 Service: {status}") # Config Status if CONFIG_FILE.exists(): print(f"✅ Konfiguration: {CONFIG_FILE}") else: print(f"❌ Konfiguration: Nicht vorhanden") # Log Status if LOG_FILE.exists(): size = LOG_FILE.stat().st_size print(f"📄 Log-Datei: {LOG_FILE} ({size} Bytes)") # Letzte Log-Einträge if LOG_FILE.exists(): print(f"\n📋 Letzte Log-Einträge:") print("-" * 40) result = subprocess.run( ["tail", "-5", str(LOG_FILE)], capture_output=True, text=True ) print(result.stdout if result.stdout else " (keine Einträge)") def send_pushover_alert(config, subject="", sender=""): """Sendet einen Pushover Emergency Alert.""" import requests message = config["ALARM"]["message"] if subject or sender: message = f"{message}\n\nVon: {sender}\nBetreff: {subject}" data = { "token": config["PUSHOVER"]["token"], "user": config["PUSHOVER"]["user"], "message": message, "title": config["ALARM"]["title"], "priority": "2", "retry": config["ALARM"]["retry"], "expire": config["ALARM"]["expire"] } if config["PUSHOVER"]["sound"]: data["sound"] = config["PUSHOVER"]["sound"] try: response = requests.post( "https://api.pushover.net/1/messages.json", data=data, timeout=30 ) response.raise_for_status() logging.info(f"Pushover Alert gesendet: {subject}") return True except Exception as e: logging.error(f"Fehler beim Senden des Pushover Alerts: {e}") return False def run_daemon(): """Hauptloop für die IMAP IDLE Überwachung.""" from imapclient import IMAPClient setup_logging(daemon_mode=True) logging.info("Mail-Alarm Daemon gestartet") config = load_config() if not config: logging.error("Keine Konfiguration gefunden!") sys.exit(1) # Signal Handler für sauberes Beenden def signal_handler(signum, frame): logging.info("Beende Daemon...") sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) while True: try: logging.info(f"Verbinde mit {config['IMAP']['server']}...") with IMAPClient( host=config["IMAP"]["server"], port=int(config["IMAP"]["port"]), ssl=True ) as client: client.login( config["IMAP"]["username"], config["IMAP"]["password"] ) logging.info("IMAP Login erfolgreich") client.select_folder(config["IMAP"]["folder"]) logging.info(f"Überwache Ordner: {config['IMAP']['folder']}") # Initiale Anzahl ungelesener Mails merken unseen = client.search(["UNSEEN"]) last_unseen_count = len(unseen) logging.info(f"Aktuelle ungelesene Mails: {last_unseen_count}") while True: # IMAP IDLE starten client.idle() try: # Warte auf Änderungen (max. 5 Minuten, dann Reconnect) responses = client.idle_check(timeout=300) client.idle_done() if responses: # Prüfe auf neue ungelesene Mails unseen = client.search(["UNSEEN"]) current_unseen_count = len(unseen) if current_unseen_count > last_unseen_count: # Neue Mail(s) eingetroffen! new_count = current_unseen_count - last_unseen_count logging.info(f"🚨 {new_count} neue E-Mail(s)!") # Hole Details der neuesten Mail if unseen: latest_uid = unseen[-1] messages = client.fetch([latest_uid], ["ENVELOPE"]) if latest_uid in messages: envelope = messages[latest_uid][b"ENVELOPE"] subject = envelope.subject.decode() if envelope.subject else "(kein Betreff)" sender = envelope.from_[0].mailbox.decode() + "@" + envelope.from_[0].host.decode() if envelope.from_ else "(unbekannt)" logging.info(f"Von: {sender}, Betreff: {subject}") send_pushover_alert(config, subject, sender) else: send_pushover_alert(config) else: send_pushover_alert(config) last_unseen_count = current_unseen_count except Exception as e: client.idle_done() raise e except Exception as e: logging.error(f"Verbindungsfehler: {e}") logging.info("Warte 30 Sekunden vor erneutem Verbindungsversuch...") time.sleep(30) def main_menu(): """Zeigt das Hauptmenü an.""" while True: print("\n" + "=" * 50) print("📬 MAIL-ALARM SETUP") print("=" * 50) print("\n 1) Installieren (Service einrichten & starten)") print(" 2) Deinstallieren (Service stoppen & entfernen)") print(" 3) Status anzeigen") print(" 4) Beenden") choice = input("\nAuswahl [1-4]: ").strip() if choice == "1": install() elif choice == "2": uninstall() elif choice == "3": show_status() elif choice == "4": print("\n👋 Auf Wiedersehen!") sys.exit(0) else: print("❌ Ungültige Auswahl") def main(): """Hauptfunktion.""" if len(sys.argv) > 1 and sys.argv[1] == "--daemon": # Daemon-Modus run_daemon() else: # Interaktiver Modus check_root() main_menu() if __name__ == "__main__": main()