hosting-platform/backend/app/services/admin_api_service.py

210 lines
6.5 KiB
Python
Raw Permalink Normal View History

"""
Admin Panel API Service
Handles communication with Admin Panel API to fetch CF accounts
"""
import requests
from typing import Dict, List, Optional
from app.config import Config
class AdminAPIService:
"""Service to communicate with Admin Panel API"""
def __init__(self):
self.base_url = Config.ADMIN_API_URL
self.api_key = Config.ADMIN_API_INTERNAL_KEY
self.headers = {
'X-Internal-API-Key': self.api_key,
'Content-Type': 'application/json'
}
def get_available_cf_accounts(self) -> Dict:
"""
Fetch available CF accounts from Admin Panel
Returns:
{
"status": "success" | "error",
"accounts": [...],
"total": int,
"error": str (if error)
}
"""
try:
url = f"{self.base_url}/api/cf-accounts/internal/available"
response = requests.get(url, headers=self.headers, timeout=10)
if response.status_code == 200:
return response.json()
else:
return {
'status': 'error',
'error': f'Admin API returned {response.status_code}: {response.text}',
'accounts': [],
'total': 0
}
except requests.exceptions.Timeout:
return {
'status': 'error',
'error': 'Admin API request timeout',
'accounts': [],
'total': 0
}
except requests.exceptions.ConnectionError:
return {
'status': 'error',
'error': 'Cannot connect to Admin API',
'accounts': [],
'total': 0
}
except Exception as e:
return {
'status': 'error',
'error': f'Admin API error: {str(e)}',
'accounts': [],
'total': 0
}
def get_cf_account(self, account_id: int) -> Dict:
"""
Fetch specific CF account with API token from Admin Panel
Args:
account_id: CF account ID
Returns:
{
"status": "success" | "error",
"account": {...},
"error": str (if error)
}
"""
try:
url = f"{self.base_url}/api/cf-accounts/internal/{account_id}"
response = requests.get(url, headers=self.headers, timeout=10)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
return {
'status': 'error',
'error': 'CF account not found'
}
elif response.status_code == 403:
return {
'status': 'error',
'error': 'CF account not available for verification'
}
else:
return {
'status': 'error',
'error': f'Admin API returned {response.status_code}: {response.text}'
}
except requests.exceptions.Timeout:
return {
'status': 'error',
'error': 'Admin API request timeout'
}
except requests.exceptions.ConnectionError:
return {
'status': 'error',
'error': 'Cannot connect to Admin API'
}
except Exception as e:
return {
'status': 'error',
'error': f'Admin API error: {str(e)}'
}
def increment_domain_count(self, account_id: int) -> Dict:
"""
Increment domain count for CF account in Admin Panel
Called after successfully creating a domain
Args:
account_id: CF account ID
Returns:
{
"status": "success" | "error",
"message": str,
"new_count": int,
"available_capacity": int,
"error": str (if error)
}
"""
try:
url = f"{self.base_url}/api/cf-accounts/internal/{account_id}/increment"
response = requests.post(url, headers=self.headers, timeout=10)
if response.status_code == 200:
return response.json()
else:
return {
'status': 'error',
'error': f'Admin API returned {response.status_code}: {response.text}'
}
except requests.exceptions.Timeout:
return {
'status': 'error',
'error': 'Admin API request timeout'
}
except requests.exceptions.ConnectionError:
return {
'status': 'error',
'error': 'Cannot connect to Admin API'
}
except Exception as e:
return {
'status': 'error',
'error': f'Admin API error: {str(e)}'
}
def decrement_domain_count(self, account_id: int) -> Dict:
"""
Decrement domain count for CF account in Admin Panel
Called after successfully deleting a domain
Args:
account_id: CF account ID
Returns:
{
"status": "success" | "error",
"message": str,
"new_count": int,
"available_capacity": int,
"error": str (if error)
}
"""
try:
url = f"{self.base_url}/api/cf-accounts/internal/{account_id}/decrement"
response = requests.post(url, headers=self.headers, timeout=10)
if response.status_code == 200:
return response.json()
else:
return {
'status': 'error',
'error': f'Admin API returned {response.status_code}: {response.text}'
}
except requests.exceptions.Timeout:
return {
'status': 'error',
'error': 'Admin API request timeout'
}
except requests.exceptions.ConnectionError:
return {
'status': 'error',
'error': 'Cannot connect to Admin API'
}
except Exception as e:
return {
'status': 'error',
'error': f'Admin API error: {str(e)}'
}