hosting-platform/backend/app/services/nameserver_service.py

174 lines
5.3 KiB
Python
Raw Normal View History

2026-01-11 14:38:39 +00:00
"""
Nameserver kontrolü ve doğrulama servisi
"""
import dns.resolver
from typing import Dict, List, Optional
import CloudFlare
class NameserverService:
"""NS kayıtlarını kontrol eden servis"""
CLOUDFLARE_NAMESERVERS = [
"ns1.cloudflare.com",
"ns2.cloudflare.com",
"ns3.cloudflare.com",
"ns4.cloudflare.com",
"ns5.cloudflare.com",
"ns6.cloudflare.com",
"ns7.cloudflare.com",
]
@staticmethod
def get_current_nameservers(domain: str) -> Dict:
"""
Domain'in mevcut nameserver'larını al
Args:
domain: Kontrol edilecek domain
Returns:
{
"status": "success" | "error",
"nameservers": ["ns1.example.com", ...],
"message": "..."
}
"""
try:
# NS kayıtlarını sorgula
resolver = dns.resolver.Resolver()
resolver.timeout = 5
resolver.lifetime = 5
answers = resolver.resolve(domain, 'NS')
nameservers = [str(rdata.target).rstrip('.') for rdata in answers]
return {
"status": "success",
"nameservers": nameservers,
"count": len(nameservers)
}
except dns.resolver.NXDOMAIN:
return {
"status": "error",
"message": f"Domain '{domain}' bulunamadı (NXDOMAIN)",
"nameservers": []
}
except dns.resolver.NoAnswer:
return {
"status": "error",
"message": f"Domain '{domain}' için NS kaydı bulunamadı",
"nameservers": []
}
except dns.resolver.Timeout:
return {
"status": "error",
"message": "DNS sorgusu zaman aşımına uğradı",
"nameservers": []
}
except Exception as e:
return {
"status": "error",
"message": f"NS sorgu hatası: {str(e)}",
"nameservers": []
}
@staticmethod
def check_cloudflare_nameservers(domain: str) -> Dict:
"""
Domain'in NS'lerinin Cloudflare'e yönlendirilip yönlendirilmediğini kontrol et
Returns:
{
"status": "success" | "partial" | "error",
"is_cloudflare": bool,
"current_nameservers": [...],
"cloudflare_nameservers": [...],
"message": "..."
}
"""
result = NameserverService.get_current_nameservers(domain)
if result["status"] == "error":
return {
"status": "error",
"is_cloudflare": False,
"current_nameservers": [],
"cloudflare_nameservers": [],
"message": result["message"]
}
current_ns = result["nameservers"]
# Cloudflare NS'leri ile karşılaştır
cf_ns_found = []
other_ns_found = []
for ns in current_ns:
ns_lower = ns.lower()
if any(cf_ns in ns_lower for cf_ns in NameserverService.CLOUDFLARE_NAMESERVERS):
cf_ns_found.append(ns)
else:
other_ns_found.append(ns)
# Tüm NS'ler Cloudflare mı?
all_cloudflare = len(cf_ns_found) > 0 and len(other_ns_found) == 0
# Kısmi Cloudflare mı?
partial_cloudflare = len(cf_ns_found) > 0 and len(other_ns_found) > 0
if all_cloudflare:
status = "success"
message = f"✅ Tüm nameserver'lar Cloudflare'e yönlendirilmiş ({len(cf_ns_found)} NS)"
elif partial_cloudflare:
status = "partial"
message = f"⚠️ Bazı nameserver'lar Cloudflare'e yönlendirilmiş ({len(cf_ns_found)}/{len(current_ns)})"
else:
status = "error"
message = f"❌ Nameserver'lar henüz Cloudflare'e yönlendirilmemiş"
return {
"status": status,
"is_cloudflare": all_cloudflare,
"current_nameservers": current_ns,
"cloudflare_nameservers": cf_ns_found,
"other_nameservers": other_ns_found,
"message": message
}
@staticmethod
def get_cloudflare_zone_nameservers(zone_id: str, api_token: str) -> Dict:
"""
Cloudflare zone'un nameserver'larını al
Returns:
{
"status": "success" | "error",
"nameservers": ["ns1.cloudflare.com", ...],
"message": "..."
}
"""
try:
cf = CloudFlare.CloudFlare(token=api_token)
zone = cf.zones.get(zone_id)
nameservers = zone.get("name_servers", [])
return {
"status": "success",
"nameservers": nameservers,
"count": len(nameservers)
}
except Exception as e:
return {
"status": "error",
"message": f"Cloudflare zone NS sorgu hatası: {str(e)}",
"nameservers": []
}