ip resolve local over whois

This commit is contained in:
Thomas Ciesla
2026-01-09 14:08:17 +01:00
parent 80fa35fb74
commit 682cc5402a
2 changed files with 184 additions and 5 deletions

View File

@@ -1045,6 +1045,10 @@ _ip_info_cache: Dict[str, Dict[str, Any]] = {}
_ip_api_last_request = 0.0 _ip_api_last_request = 0.0
_ip_api_request_count = 0 _ip_api_request_count = 0
# WHOIS Cache (1 Stunde TTL)
WHOIS_CACHE_TTL = 3600 # 1 Stunde
_whois_cache: Dict[str, Dict[str, Any]] = {}
# Global Country Ranges Cache (loaded once from ipdeny.com files) # Global Country Ranges Cache (loaded once from ipdeny.com files)
_country_ranges_cache: Dict[str, List[tuple]] = {} # country -> [(network_int, mask_int), ...] _country_ranges_cache: Dict[str, List[tuple]] = {} # country -> [(network_int, mask_int), ...]
_country_cache_loaded = False _country_cache_loaded = False
@@ -1202,6 +1206,112 @@ def get_cached_ip_info(ip: str) -> Optional[Dict[str, Any]]:
return _ip_info_cache.get(ip) return _ip_info_cache.get(ip)
def whois_lookup(ip: str) -> Dict[str, Any]:
"""
Führt WHOIS-Lookup für eine IP durch (lokales whois-Tool).
Cached Ergebnisse für 1 Stunde.
Returns:
Dict mit: netname, org, asn, country, abuse, range, raw
"""
global _whois_cache
# Cache prüfen
if ip in _whois_cache:
cached = _whois_cache[ip]
if time.time() - cached.get('_cached_at', 0) < WHOIS_CACHE_TTL:
return cached
result = {
'ip': ip,
'netname': '',
'org': '',
'asn': '',
'country': '',
'abuse': '',
'range': '',
'descr': '',
'raw': '',
'_cached_at': time.time()
}
try:
# WHOIS-Kommando ausführen
proc = subprocess.run(
['whois', ip],
capture_output=True,
text=True,
timeout=10
)
if proc.returncode != 0:
logger.debug(f"WHOIS fehlgeschlagen für {ip}: {proc.stderr}")
_whois_cache[ip] = result
return result
raw_output = proc.stdout
result['raw'] = raw_output
# Parsing - verschiedene WHOIS-Formate unterstützen (RIPE, ARIN, APNIC, etc.)
lines = raw_output.split('\n')
for line in lines:
line_lower = line.lower().strip()
# Netname
if line_lower.startswith('netname:'):
result['netname'] = line.split(':', 1)[1].strip()
# Organisation
elif line_lower.startswith('org-name:') or line_lower.startswith('orgname:'):
result['org'] = line.split(':', 1)[1].strip()
elif line_lower.startswith('organization:') and not result['org']:
result['org'] = line.split(':', 1)[1].strip()
elif line_lower.startswith('descr:') and not result['org']:
# Fallback zu descr wenn keine org gefunden
val = line.split(':', 1)[1].strip()
if val and not result['descr']:
result['descr'] = val
# ASN
elif line_lower.startswith('origin:') or line_lower.startswith('originas:'):
result['asn'] = line.split(':', 1)[1].strip()
# Country
elif line_lower.startswith('country:') and not result['country']:
result['country'] = line.split(':', 1)[1].strip().upper()
# Abuse Contact
elif 'abuse' in line_lower and '@' in line:
# E-Mail aus der Zeile extrahieren
import re
email_match = re.search(r'[\w\.-]+@[\w\.-]+\.\w+', line)
if email_match and not result['abuse']:
result['abuse'] = email_match.group(0)
# IP Range (inetnum/NetRange)
elif line_lower.startswith('inetnum:') or line_lower.startswith('netrange:'):
result['range'] = line.split(':', 1)[1].strip()
elif line_lower.startswith('cidr:') and not result['range']:
result['range'] = line.split(':', 1)[1].strip()
# Fallback: descr als org wenn org leer
if not result['org'] and result['descr']:
result['org'] = result['descr']
logger.debug(f"WHOIS für {ip}: netname={result['netname']}, org={result['org']}, asn={result['asn']}")
except subprocess.TimeoutExpired:
logger.warning(f"WHOIS Timeout für {ip}")
except FileNotFoundError:
logger.error("WHOIS-Tool nicht installiert! Bitte 'whois' installieren.")
except Exception as e:
logger.error(f"WHOIS Fehler für {ip}: {e}")
_whois_cache[ip] = result
return result
# ============================================================================= # =============================================================================
# LIVE STATISTICS TRACKER # LIVE STATISTICS TRACKER
# ============================================================================= # =============================================================================
@@ -3889,6 +3999,9 @@ class JTLWAFiAgent:
elif event_type == 'command.autoban_config': elif event_type == 'command.autoban_config':
await self._handle_autoban_config_command(event_data) await self._handle_autoban_config_command(event_data)
elif event_type == 'command.whois':
await self._handle_whois_command(event_data)
elif event_type == 'log.subscribe': elif event_type == 'log.subscribe':
shop = event_data.get('shop') shop = event_data.get('shop')
if shop: if shop:
@@ -4376,6 +4489,47 @@ class JTLWAFiAgent:
'message': str(e) 'message': str(e)
}) })
async def _handle_whois_command(self, data: Dict[str, Any]):
"""Führt WHOIS-Lookup für eine IP durch und sendet Ergebnis."""
command_id = data.get('command_id', 'unknown')
ip = data.get('ip')
if not ip:
await self._send_event('whois_result', {
'command_id': command_id,
'success': False,
'error': 'Keine IP angegeben'
})
return
try:
# WHOIS-Lookup durchführen (wird gecacht)
result = whois_lookup(ip)
# Ergebnis senden (ohne raw für kleinere Payload)
await self._send_event('whois_result', {
'command_id': command_id,
'success': True,
'ip': ip,
'netname': result.get('netname', ''),
'org': result.get('org', ''),
'asn': result.get('asn', ''),
'country': result.get('country', ''),
'abuse': result.get('abuse', ''),
'range': result.get('range', '')
})
logger.info(f"WHOIS für {ip}: {result.get('org', 'Unknown')} ({result.get('asn', 'N/A')})")
except Exception as e:
logger.error(f"WHOIS Fehler für {ip}: {e}")
await self._send_event('whois_result', {
'command_id': command_id,
'success': False,
'ip': ip,
'error': str(e)
})
async def _periodic_tasks(self): async def _periodic_tasks(self):
"""Führt periodische Tasks aus.""" """Führt periodische Tasks aus."""
while self.running: while self.running:

File diff suppressed because one or more lines are too long