""" Admin Panel API Service Handles communication with Admin Panel API to fetch CF accounts """ import requests from typing import Dict, List, Optional from app.config import Config class AdminAPIService: """Service to communicate with Admin Panel API""" def __init__(self): self.base_url = Config.ADMIN_API_URL self.api_key = Config.ADMIN_API_INTERNAL_KEY self.headers = { 'X-Internal-API-Key': self.api_key, 'Content-Type': 'application/json' } def get_available_cf_accounts(self) -> Dict: """ Fetch available CF accounts from Admin Panel Returns: { "status": "success" | "error", "accounts": [...], "total": int, "error": str (if error) } """ try: url = f"{self.base_url}/api/cf-accounts/internal/available" response = requests.get(url, headers=self.headers, timeout=10) if response.status_code == 200: return response.json() else: return { 'status': 'error', 'error': f'Admin API returned {response.status_code}: {response.text}', 'accounts': [], 'total': 0 } except requests.exceptions.Timeout: return { 'status': 'error', 'error': 'Admin API request timeout', 'accounts': [], 'total': 0 } except requests.exceptions.ConnectionError: return { 'status': 'error', 'error': 'Cannot connect to Admin API', 'accounts': [], 'total': 0 } except Exception as e: return { 'status': 'error', 'error': f'Admin API error: {str(e)}', 'accounts': [], 'total': 0 } def get_cf_account(self, account_id: int) -> Dict: """ Fetch specific CF account with API token from Admin Panel Args: account_id: CF account ID Returns: { "status": "success" | "error", "account": {...}, "error": str (if error) } """ try: url = f"{self.base_url}/api/cf-accounts/internal/{account_id}" response = requests.get(url, headers=self.headers, timeout=10) if response.status_code == 200: return response.json() elif response.status_code == 404: return { 'status': 'error', 'error': 'CF account not found' } elif response.status_code == 403: return { 'status': 'error', 'error': 'CF account not available for verification' } else: return { 'status': 'error', 'error': f'Admin API returned {response.status_code}: {response.text}' } except requests.exceptions.Timeout: return { 'status': 'error', 'error': 'Admin API request timeout' } except requests.exceptions.ConnectionError: return { 'status': 'error', 'error': 'Cannot connect to Admin API' } except Exception as e: return { 'status': 'error', 'error': f'Admin API error: {str(e)}' } def increment_domain_count(self, account_id: int) -> Dict: """ Increment domain count for CF account in Admin Panel Called after successfully creating a domain Args: account_id: CF account ID Returns: { "status": "success" | "error", "message": str, "new_count": int, "available_capacity": int, "error": str (if error) } """ try: url = f"{self.base_url}/api/cf-accounts/internal/{account_id}/increment" response = requests.post(url, headers=self.headers, timeout=10) if response.status_code == 200: return response.json() else: return { 'status': 'error', 'error': f'Admin API returned {response.status_code}: {response.text}' } except requests.exceptions.Timeout: return { 'status': 'error', 'error': 'Admin API request timeout' } except requests.exceptions.ConnectionError: return { 'status': 'error', 'error': 'Cannot connect to Admin API' } except Exception as e: return { 'status': 'error', 'error': f'Admin API error: {str(e)}' } def decrement_domain_count(self, account_id: int) -> Dict: """ Decrement domain count for CF account in Admin Panel Called after successfully deleting a domain Args: account_id: CF account ID Returns: { "status": "success" | "error", "message": str, "new_count": int, "available_capacity": int, "error": str (if error) } """ try: url = f"{self.base_url}/api/cf-accounts/internal/{account_id}/decrement" response = requests.post(url, headers=self.headers, timeout=10) if response.status_code == 200: return response.json() else: return { 'status': 'error', 'error': f'Admin API returned {response.status_code}: {response.text}' } except requests.exceptions.Timeout: return { 'status': 'error', 'error': 'Admin API request timeout' } except requests.exceptions.ConnectionError: return { 'status': 'error', 'error': 'Cannot connect to Admin API' } except Exception as e: return { 'status': 'error', 'error': f'Admin API error: {str(e)}' }