mail-alarm.py hinzugefügt

This commit is contained in:
2025-12-13 19:24:02 +01:00
parent 6e30843963
commit 522de6c5c3

464
mail-alarm.py Normal file
View File

@@ -0,0 +1,464 @@
#!/usr/bin/env python3
"""
Mail-Alarm - IMAP IDLE E-Mail Überwachung mit Pushover Emergency Alerts
"""
import os
import sys
import time
import signal
import logging
import configparser
import subprocess
from pathlib import Path
# Konstanten
CONFIG_DIR = Path("/etc/mail-alarm")
CONFIG_FILE = CONFIG_DIR / "config.ini"
LOG_FILE = Path("/var/log/mail-alarm.log")
SERVICE_NAME = "mail-alarm"
SERVICE_FILE = Path(f"/etc/systemd/system/{SERVICE_NAME}.service")
SCRIPT_PATH = Path("/usr/local/bin/mail-alarm.py")
# Logging Setup
def setup_logging(daemon_mode=False):
"""Konfiguriert das Logging."""
log_format = "%(asctime)s - %(levelname)s - %(message)s"
if daemon_mode:
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
else:
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[logging.StreamHandler()]
)
def check_root():
"""Prüft ob das Script als root läuft."""
if os.geteuid() != 0:
print("❌ Dieses Script muss als root ausgeführt werden!")
print(" Bitte mit 'sudo python3 mail-alarm.py' starten.")
sys.exit(1)
def install_dependencies():
"""Installiert Python-Abhängigkeiten via apt."""
print("\n📦 Installiere Abhängigkeiten...")
packages = ["python3-imapclient", "python3-requests"]
try:
subprocess.run(
["apt-get", "update"],
check=True,
capture_output=True
)
subprocess.run(
["apt-get", "install", "-y"] + packages,
check=True,
capture_output=True
)
print("✅ Abhängigkeiten installiert")
return True
except subprocess.CalledProcessError as e:
print(f"❌ Fehler bei Installation: {e}")
return False
def get_user_input(prompt, default=None, password=False):
"""Holt Benutzereingabe mit optionalem Default-Wert."""
if default:
prompt = f"{prompt} [{default}]: "
else:
prompt = f"{prompt}: "
if password:
import getpass
value = getpass.getpass(prompt)
else:
value = input(prompt)
return value.strip() if value.strip() else default
def create_config():
"""Erstellt die Konfigurationsdatei interaktiv."""
print("\n⚙️ Konfiguration erstellen")
print("-" * 40)
print("\n📧 IMAP Einstellungen:")
imap_server = get_user_input(" IMAP Server", "imap.jtl-software.de")
imap_port = get_user_input(" IMAP Port", "993")
imap_user = get_user_input(" Benutzername/E-Mail")
imap_pass = get_user_input(" Passwort", password=True)
imap_folder = get_user_input(" Ordner", "INBOX")
print("\n📱 Pushover Einstellungen:")
pushover_token = get_user_input(" App Token")
pushover_user = get_user_input(" User Key")
pushover_sound = get_user_input(" Sound (leer für Default)", "")
print("\n⏱️ Alarm Einstellungen:")
retry = get_user_input(" Retry Intervall (Sekunden)", "30")
expire = get_user_input(" Expire Zeit (Sekunden)", "3600")
# Config erstellen
config = configparser.ConfigParser()
config["IMAP"] = {
"server": imap_server,
"port": imap_port,
"username": imap_user,
"password": imap_pass,
"folder": imap_folder
}
config["PUSHOVER"] = {
"token": pushover_token,
"user": pushover_user,
"sound": pushover_sound
}
config["ALARM"] = {
"retry": retry,
"expire": expire,
"title": "Rufbereitschaft",
"message": "ALARM: Neue E-Mail im Rufbereitschafts-Postfach!"
}
# Verzeichnis erstellen und Config speichern
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
with open(CONFIG_FILE, "w") as f:
config.write(f)
# Nur root darf die Config lesen (enthält Passwörter)
os.chmod(CONFIG_FILE, 0o600)
print(f"\n✅ Konfiguration gespeichert in {CONFIG_FILE}")
return True
def load_config():
"""Lädt die Konfiguration."""
if not CONFIG_FILE.exists():
return None
config = configparser.ConfigParser()
config.read(CONFIG_FILE)
return config
def create_systemd_service():
"""Erstellt den systemd Service."""
service_content = f"""[Unit]
Description=Mail-Alarm IMAP Überwachung
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
ExecStart=/usr/bin/python3 {SCRIPT_PATH} --daemon
Restart=always
RestartSec=10
StandardOutput=append:{LOG_FILE}
StandardError=append:{LOG_FILE}
[Install]
WantedBy=multi-user.target
"""
with open(SERVICE_FILE, "w") as f:
f.write(service_content)
print(f"✅ Systemd Service erstellt: {SERVICE_FILE}")
def install():
"""Führt die Installation durch."""
print("\n" + "=" * 50)
print("📥 INSTALLATION")
print("=" * 50)
# Abhängigkeiten installieren
if not install_dependencies():
return False
# Script kopieren
print(f"\n📄 Kopiere Script nach {SCRIPT_PATH}...")
current_script = Path(__file__).resolve()
subprocess.run(["cp", str(current_script), str(SCRIPT_PATH)], check=True)
os.chmod(SCRIPT_PATH, 0o755)
print("✅ Script kopiert")
# Log-Datei erstellen
LOG_FILE.touch(exist_ok=True)
os.chmod(LOG_FILE, 0o644)
# Konfiguration erstellen
if not create_config():
return False
# Systemd Service erstellen
print("\n🔧 Erstelle Systemd Service...")
create_systemd_service()
# Service aktivieren und starten
print("\n🚀 Starte Service...")
subprocess.run(["systemctl", "daemon-reload"], check=True)
subprocess.run(["systemctl", "enable", SERVICE_NAME], check=True)
subprocess.run(["systemctl", "start", SERVICE_NAME], check=True)
print("\n" + "=" * 50)
print("✅ INSTALLATION ABGESCHLOSSEN")
print("=" * 50)
print(f"\n📋 Nützliche Befehle:")
print(f" Status: systemctl status {SERVICE_NAME}")
print(f" Logs: journalctl -u {SERVICE_NAME} -f")
print(f" Log-Datei: tail -f {LOG_FILE}")
print(f" Stoppen: systemctl stop {SERVICE_NAME}")
print(f" Starten: systemctl start {SERVICE_NAME}")
return True
def uninstall():
"""Führt die Deinstallation durch."""
print("\n" + "=" * 50)
print("📤 DEINSTALLATION")
print("=" * 50)
# Service stoppen und deaktivieren
print("\n🛑 Stoppe Service...")
subprocess.run(["systemctl", "stop", SERVICE_NAME], capture_output=True)
subprocess.run(["systemctl", "disable", SERVICE_NAME], capture_output=True)
# Service-Datei entfernen
if SERVICE_FILE.exists():
SERVICE_FILE.unlink()
print("✅ Systemd Service entfernt")
subprocess.run(["systemctl", "daemon-reload"], check=True)
# Script entfernen
if SCRIPT_PATH.exists():
SCRIPT_PATH.unlink()
print("✅ Script entfernt")
# Nachfragen ob Config und Logs gelöscht werden sollen
delete_config = input("\n🗑️ Konfiguration löschen? (j/N): ").lower() == "j"
if delete_config and CONFIG_DIR.exists():
import shutil
shutil.rmtree(CONFIG_DIR)
print("✅ Konfiguration gelöscht")
delete_logs = input("🗑️ Log-Datei löschen? (j/N): ").lower() == "j"
if delete_logs and LOG_FILE.exists():
LOG_FILE.unlink()
print("✅ Log-Datei gelöscht")
print("\n" + "=" * 50)
print("✅ DEINSTALLATION ABGESCHLOSSEN")
print("=" * 50)
return True
def show_status():
"""Zeigt den Status des Services an."""
print("\n" + "=" * 50)
print("📊 STATUS")
print("=" * 50)
# Service Status
result = subprocess.run(
["systemctl", "is-active", SERVICE_NAME],
capture_output=True,
text=True
)
status = result.stdout.strip()
if status == "active":
print(f"\n🟢 Service: Läuft")
else:
print(f"\n🔴 Service: {status}")
# Config Status
if CONFIG_FILE.exists():
print(f"✅ Konfiguration: {CONFIG_FILE}")
else:
print(f"❌ Konfiguration: Nicht vorhanden")
# Log Status
if LOG_FILE.exists():
size = LOG_FILE.stat().st_size
print(f"📄 Log-Datei: {LOG_FILE} ({size} Bytes)")
# Letzte Log-Einträge
if LOG_FILE.exists():
print(f"\n📋 Letzte Log-Einträge:")
print("-" * 40)
result = subprocess.run(
["tail", "-5", str(LOG_FILE)],
capture_output=True,
text=True
)
print(result.stdout if result.stdout else " (keine Einträge)")
def send_pushover_alert(config, subject="", sender=""):
"""Sendet einen Pushover Emergency Alert."""
import requests
message = config["ALARM"]["message"]
if subject or sender:
message = f"{message}\n\nVon: {sender}\nBetreff: {subject}"
data = {
"token": config["PUSHOVER"]["token"],
"user": config["PUSHOVER"]["user"],
"message": message,
"title": config["ALARM"]["title"],
"priority": "2",
"retry": config["ALARM"]["retry"],
"expire": config["ALARM"]["expire"]
}
if config["PUSHOVER"]["sound"]:
data["sound"] = config["PUSHOVER"]["sound"]
try:
response = requests.post(
"https://api.pushover.net/1/messages.json",
data=data,
timeout=30
)
response.raise_for_status()
logging.info(f"Pushover Alert gesendet: {subject}")
return True
except Exception as e:
logging.error(f"Fehler beim Senden des Pushover Alerts: {e}")
return False
def run_daemon():
"""Hauptloop für die IMAP IDLE Überwachung."""
from imapclient import IMAPClient
setup_logging(daemon_mode=True)
logging.info("Mail-Alarm Daemon gestartet")
config = load_config()
if not config:
logging.error("Keine Konfiguration gefunden!")
sys.exit(1)
# Signal Handler für sauberes Beenden
def signal_handler(signum, frame):
logging.info("Beende Daemon...")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
while True:
try:
logging.info(f"Verbinde mit {config['IMAP']['server']}...")
with IMAPClient(
host=config["IMAP"]["server"],
port=int(config["IMAP"]["port"]),
ssl=True
) as client:
client.login(
config["IMAP"]["username"],
config["IMAP"]["password"]
)
logging.info("IMAP Login erfolgreich")
client.select_folder(config["IMAP"]["folder"])
logging.info(f"Überwache Ordner: {config['IMAP']['folder']}")
# Initiale Anzahl ungelesener Mails merken
unseen = client.search(["UNSEEN"])
last_unseen_count = len(unseen)
logging.info(f"Aktuelle ungelesene Mails: {last_unseen_count}")
while True:
# IMAP IDLE starten
client.idle()
try:
# Warte auf Änderungen (max. 5 Minuten, dann Reconnect)
responses = client.idle_check(timeout=300)
client.idle_done()
if responses:
# Prüfe auf neue ungelesene Mails
unseen = client.search(["UNSEEN"])
current_unseen_count = len(unseen)
if current_unseen_count > last_unseen_count:
# Neue Mail(s) eingetroffen!
new_count = current_unseen_count - last_unseen_count
logging.info(f"🚨 {new_count} neue E-Mail(s)!")
# Hole Details der neuesten Mail
if unseen:
latest_uid = unseen[-1]
messages = client.fetch([latest_uid], ["ENVELOPE"])
if latest_uid in messages:
envelope = messages[latest_uid][b"ENVELOPE"]
subject = envelope.subject.decode() if envelope.subject else "(kein Betreff)"
sender = envelope.from_[0].mailbox.decode() + "@" + envelope.from_[0].host.decode() if envelope.from_ else "(unbekannt)"
logging.info(f"Von: {sender}, Betreff: {subject}")
send_pushover_alert(config, subject, sender)
else:
send_pushover_alert(config)
else:
send_pushover_alert(config)
last_unseen_count = current_unseen_count
except Exception as e:
client.idle_done()
raise e
except Exception as e:
logging.error(f"Verbindungsfehler: {e}")
logging.info("Warte 30 Sekunden vor erneutem Verbindungsversuch...")
time.sleep(30)
def main_menu():
"""Zeigt das Hauptmenü an."""
while True:
print("\n" + "=" * 50)
print("📬 MAIL-ALARM SETUP")
print("=" * 50)
print("\n 1) Installieren (Service einrichten & starten)")
print(" 2) Deinstallieren (Service stoppen & entfernen)")
print(" 3) Status anzeigen")
print(" 4) Beenden")
choice = input("\nAuswahl [1-4]: ").strip()
if choice == "1":
install()
elif choice == "2":
uninstall()
elif choice == "3":
show_status()
elif choice == "4":
print("\n👋 Auf Wiedersehen!")
sys.exit(0)
else:
print("❌ Ungültige Auswahl")
def main():
"""Hauptfunktion."""
if len(sys.argv) > 1 and sys.argv[1] == "--daemon":
# Daemon-Modus
run_daemon()
else:
# Interaktiver Modus
check_root()
main_menu()
if __name__ == "__main__":
main()