hosting-platform/backend/app/services/auth_service.py

234 lines
6.4 KiB
Python

"""
Authentication service - JWT token generation and validation
"""
import jwt
import secrets
from datetime import datetime, timedelta
from functools import wraps
from flask import request, jsonify, current_app
from app.models.user import User, Customer
from app.models.domain import db
class AuthService:
"""Authentication service for JWT tokens"""
@staticmethod
def generate_token(user_id, role='customer', expires_in=24):
"""
Generate JWT token
Args:
user_id: User ID
role: User role (customer/admin)
expires_in: Token expiration in hours (default 24)
Returns:
JWT token string
"""
payload = {
'user_id': user_id,
'role': role,
'exp': datetime.utcnow() + timedelta(hours=expires_in),
'iat': datetime.utcnow()
}
token = jwt.encode(
payload,
current_app.config['SECRET_KEY'],
algorithm='HS256'
)
return token
@staticmethod
def verify_token(token):
"""
Verify JWT token
Args:
token: JWT token string
Returns:
dict: Decoded payload or None if invalid
"""
try:
payload = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256']
)
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
@staticmethod
def register_user(email, password, full_name, company_name=None):
"""
Register new user
Args:
email: User email
password: User password
full_name: User full name
company_name: Optional company name
Returns:
tuple: (user, customer, error)
"""
# Check if user exists
existing_user = User.query.filter_by(email=email).first()
if existing_user:
return None, None, "Email already registered"
# Create user
user = User(
email=email,
full_name=full_name,
role='customer',
is_active=True,
is_verified=False,
verification_token=secrets.token_urlsafe(32)
)
user.set_password(password)
db.session.add(user)
db.session.flush() # Get user.id
# Create customer profile
customer = Customer(
user_id=user.id,
company_name=company_name,
subscription_plan='free',
subscription_status='active',
max_domains=5,
max_containers=3
)
db.session.add(customer)
db.session.commit()
return user, customer, None
@staticmethod
def login_user(email, password):
"""
Login user
Args:
email: User email
password: User password
Returns:
tuple: (user, token, error)
"""
user = User.query.filter_by(email=email).first()
if not user:
return None, None, "Invalid email or password"
if not user.check_password(password):
return None, None, "Invalid email or password"
if not user.is_active:
return None, None, "Account is deactivated"
# Update last login
user.last_login = datetime.utcnow()
db.session.commit()
# Generate token
token = AuthService.generate_token(user.id, user.role)
return user, token, None
@staticmethod
def get_current_user(token):
"""
Get current user from token
Args:
token: JWT token
Returns:
User object or None
"""
payload = AuthService.verify_token(token)
if not payload:
return None
user = User.query.get(payload['user_id'])
return user
# Decorators for route protection
def token_required(f):
"""Decorator to require valid JWT token"""
@wraps(f)
def decorated(*args, **kwargs):
token = None
# Get token from header
if 'Authorization' in request.headers:
auth_header = request.headers['Authorization']
try:
token = auth_header.split(' ')[1] # Bearer <token>
except IndexError:
return jsonify({'error': 'Invalid token format'}), 401
if not token:
return jsonify({'error': 'Token is missing'}), 401
# Verify token
payload = AuthService.verify_token(token)
if not payload:
return jsonify({'error': 'Token is invalid or expired'}), 401
# Get user
current_user = User.query.get(payload['user_id'])
if not current_user or not current_user.is_active:
return jsonify({'error': 'User not found or inactive'}), 401
return f(current_user, *args, **kwargs)
return decorated
def admin_required(f):
"""Decorator to require admin role"""
@wraps(f)
def decorated(*args, **kwargs):
token = None
# Get token from header
if 'Authorization' in request.headers:
auth_header = request.headers['Authorization']
try:
token = auth_header.split(' ')[1]
except IndexError:
return jsonify({'error': 'Invalid token format'}), 401
if not token:
return jsonify({'error': 'Token is missing'}), 401
# Verify token
payload = AuthService.verify_token(token)
if not payload:
return jsonify({'error': 'Token is invalid or expired'}), 401
# Check admin role
if payload.get('role') != 'admin':
return jsonify({'error': 'Admin access required'}), 403
# Get user
current_user = User.query.get(payload['user_id'])
if not current_user or not current_user.is_active:
return jsonify({'error': 'User not found or inactive'}), 401
return f(current_user, *args, **kwargs)
return decorated