Files
Link11/04_create_cert_set_cert_in_sg_lb.py
2025-12-05 16:06:05 +01:00

128 lines
3.6 KiB
Python
Executable File

#!/bin/python
from env import *
import requests
import csv
import json
import time
# === KONFIGURATION ===
CONFIG_ID = "prod"
CSV_DATEI = "serverliste_link11.csv"
# Load Balancer Settings (eintragen oder automatisch abrufen)
PROVIDER = "link11"
REGION = "global"
LISTENER = "jtlwaap-lb-prod-443"
LISTENER_PORT = 443
BASE_URL = f"https://jtlwaap.app.reblaze.io/api/v4.3/conf/{CONFIG_ID}"
HEADERS_JSON = {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json"
}
HEADERS_GET = {
"Authorization": f"Bearer {API_TOKEN}",
"Accept": "*/*"
}
def sanitize_entry_id(domain):
"""Erstellt eine gültige entry_id ohne Punkte, wie von der API gefordert."""
return domain.replace(".", "-")
def domain_to_cert_id(domain):
return f"jtlwaap-{sanitize_entry_id(domain)}"
def create_certificate(domain):
cert_id = domain_to_cert_id(domain)
url = f"{BASE_URL}/certificates/{cert_id}"
params = {
"domains": [
domain,
"www."+domain
]
}
payload = {
"id": cert_id,
"le_auto_renew": True,
"le_auto_replace": True,
"le_hash": "",
"provider_links": []
}
#return cert_id
response = requests.post(url, headers=HEADERS_JSON, params=params, data=json.dumps(payload))
if response.status_code == 201:
print(f"[✓] Zertifikat erstellt: {cert_id}")
return cert_id
elif response.status_code == 409:
print(f"[i] Zertifikat bereits vorhanden: {cert_id}")
return cert_id
else:
print(f"[✗] Fehler bei Zertifikat {cert_id}: {response.status_code}{response.text}")
return None
def update_server_group(domain, cert_id):
entry_id = sanitize_entry_id(domain)
url = f"{BASE_URL}/server-groups/{entry_id}"
# Servergruppe einlesen
response = requests.get(url, headers=HEADERS_JSON,)
target=json.loads(response.text)
# Neues Zertifikat einstellen
target['ssl_certificate'] = cert_id
# Servergruppe speichern
response = requests.put(url, headers=HEADERS_JSON, data=json.dumps(target))
if response.status_code == 200:
print(f"[✓] Servergruppe aktualisiert für {domain}")
else:
print(f"[✗] Fehler bei Servergruppe {domain}: {response.status_code}{response.text}")
def assign_to_load_balancer(domain, cert_id):
url = f"https://jtlwaap.app.reblaze.io/api/v4.3/conf/{CONFIG_ID}/load-balancers/{LISTENER}/certificates/{cert_id}"
params = {
"provider": PROVIDER,
"region": REGION,
"listener": LISTENER,
"listener-port": LISTENER_PORT
}
response = requests.put(url, headers=HEADERS_JSON, params=params)
if response.status_code == 200:
print(f"[✓] Zertifikat dem Load Balancer zugewiesen: {cert_id}")
else:
print(f"[✗] Fehler beim Load Balancer für {cert_id}: {response.status_code}{response.text}")
def verarbeite_domain(domain):
print(f"\n--- Bearbeite Domain: {domain} ---")
cert_id = create_certificate(domain)
if not cert_id:
return
time.sleep(2) # Wartezeit, falls Zertifikaterstellung async dauert
update_server_group(domain, cert_id)
#time.sleep(5)
assign_to_load_balancer(domain, cert_id)
def main():
try:
with open(CSV_DATEI, newline='') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
domain = row[0].strip()
if domain:
verarbeite_domain(domain)
except Exception as e:
print(f"[!] Fehler beim Lesen der CSV: {e}")
if __name__ == "__main__":
main()