Ojasa Mirai

Ojasa Mirai

Python

Loading...

Learning Level

🟢 Beginner🔵 Advanced
REST API BasicsHTTP RequestsStatus CodesJSON SerializationError HandlingAPI AuthenticationRate LimitingBuilding APIsWeb Scraping Basics
Python/Apis Json/Api Authentication

🔐 Advanced API Authentication — Secure Access Control

Enterprise systems require sophisticated authentication. Master JWT tokens, OAuth 2.0 flows, SAML integration, mutual TLS, and implement industry-standard security patterns.


🎫 JWT (JSON Web Tokens) Deep Dive

import jwt
from datetime import datetime, timedelta
from typing import Dict, Any

class JWTManager:
    """Manage JWT tokens for API authentication"""

    def __init__(self, secret_key: str, algorithm: str = 'HS256'):
        self.secret_key = secret_key
        self.algorithm = algorithm

    def create_token(self, data: Dict[str, Any], expires_in_hours: int = 24) -> str:
        """Create JWT token"""

        payload = data.copy()

        # Add expiration
        payload['exp'] = datetime.utcnow() + timedelta(hours=expires_in_hours)
        payload['iat'] = datetime.utcnow()

        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)

    def verify_token(self, token: str) -> Dict[str, Any]:
        """Verify and decode JWT"""

        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            return payload

        except jwt.ExpiredSignatureError:
            raise ValueError('Token expired')

        except jwt.InvalidTokenError:
            raise ValueError('Invalid token')

    def refresh_token(self, token: str) -> str:
        """Create new token from existing one"""

        payload = self.verify_token(token)

        # Remove exp and iat
        payload.pop('exp', None)
        payload.pop('iat', None)

        return self.create_token(payload)

# Usage
manager = JWTManager(secret_key='your-secret-key-here')

# Create token
token = manager.create_token({
    'user_id': 123,
    'username': 'alice',
    'roles': ['user', 'admin']
})

# Verify token
payload = manager.verify_token(token)
print(f"User: {payload['username']}")

# Refresh token
new_token = manager.refresh_token(token)

🔄 OAuth 2.0 Refresh Token Flow

import requests
from datetime import datetime, timedelta

class OAuth2TokenManager:
    """Manage OAuth 2.0 tokens with refresh"""

    def __init__(self, client_id: str, client_secret: str, token_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url

        self.access_token = None
        self.refresh_token = None
        self.expires_at = None

    def request_tokens(self, code: str, redirect_uri: str) -> bool:
        """Exchange authorization code for tokens"""

        data = {
            'grant_type': 'authorization_code',
            'code': code,
            'redirect_uri': redirect_uri,
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }

        response = requests.post(self.token_url, data=data)

        if response.status_code != 200:
            return False

        token_data = response.json()

        self.access_token = token_data['access_token']
        self.refresh_token = token_data['refresh_token']
        self.expires_at = datetime.utcnow() + timedelta(seconds=token_data['expires_in'])

        return True

    def refresh_access_token(self) -> bool:
        """Use refresh token to get new access token"""

        data = {
            'grant_type': 'refresh_token',
            'refresh_token': self.refresh_token,
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }

        response = requests.post(self.token_url, data=data)

        if response.status_code != 200:
            return False

        token_data = response.json()

        self.access_token = token_data['access_token']
        self.expires_at = datetime.utcnow() + timedelta(seconds=token_data['expires_in'])

        return True

    def get_valid_token(self) -> str:
        """Get access token, refreshing if needed"""

        if datetime.utcnow() >= self.expires_at:
            self.refresh_access_token()

        return self.access_token

    def make_request(self, method: str, url: str, **kwargs) -> requests.Response:
        """Make API request with valid token"""

        headers = kwargs.pop('headers', {})
        headers['Authorization'] = f'Bearer {self.get_valid_token()}'

        return requests.request(method, url, headers=headers, **kwargs)

# Usage
oauth = OAuth2TokenManager(
    client_id='your_client_id',
    client_secret='your_client_secret',
    token_url='https://provider.com/oauth/token'
)

# Step 1: User authorizes, get code
code = 'authorization_code_from_user'
oauth.request_tokens(code, 'https://yourapp.com/callback')

# Step 2: Make authenticated request
response = oauth.make_request('GET', 'https://api.provider.com/user')

🔒 Mutual TLS (mTLS) Authentication

import requests
from requests.auth import HTTPCertAuth

def make_mtls_request(url, client_cert, client_key, ca_cert=None):
    """Make request with mutual TLS"""

    # Verify both client and server certificates
    session = requests.Session()

    # Client certificate
    session.cert = (client_cert, client_key)

    # CA certificate for server verification
    if ca_cert:
        session.verify = ca_cert
    else:
        session.verify = True

    response = session.get(url)

    return response

# Usage
response = make_mtls_request(
    'https://api.example.com/secure',
    client_cert='/path/to/client.crt',
    client_key='/path/to/client.key',
    ca_cert='/path/to/ca.crt'
)

print(response.json())

🛡️ API Key Rotation

from typing import List, Optional
from datetime import datetime, timedelta

class APIKeyRotationManager:
    """Manage API key rotation for security"""

    def __init__(self):
        self.keys = {}  # user_id -> {key, created, rotated_at}

    def generate_key(self, user_id: int) -> str:
        """Generate new API key"""
        import secrets

        # Generate secure random key
        key = secrets.token_urlsafe(32)

        self.keys[user_id] = {
            'key': key,
            'created': datetime.utcnow(),
            'rotated_at': datetime.utcnow(),
            'active': True
        }

        return key

    def rotate_key(self, user_id: int, old_key: str) -> str:
        """Rotate API key"""

        if user_id not in self.keys:
            raise ValueError('User not found')

        if self.keys[user_id]['key'] != old_key:
            raise ValueError('Invalid key')

        # Generate new key
        new_key = self.generate_key(user_id)

        return new_key

    def validate_key(self, user_id: int, key: str) -> bool:
        """Validate API key"""

        if user_id not in self.keys:
            return False

        user_key = self.keys[user_id]

        if not user_key['active']:
            return False

        # Check age - rotate if older than 90 days
        if datetime.utcnow() - user_key['rotated_at'] > timedelta(days=90):
            return False

        return user_key['key'] == key

# Usage
manager = APIKeyRotationManager()

# Generate initial key
api_key = manager.generate_key(user_id=123)

# Validate key
if manager.validate_key(123, api_key):
    print("Key valid")

# Rotate key after 90 days
new_key = manager.rotate_key(123, api_key)

🔑 Scope-Based Access Control

from typing import Set

class ScopedTokenManager:
    """Manage OAuth scopes for fine-grained permissions"""

    AVAILABLE_SCOPES = {
        'read:users': 'Read user data',
        'write:users': 'Modify user data',
        'read:posts': 'Read posts',
        'write:posts': 'Create/edit posts',
        'admin:delete': 'Delete resources',
        'admin:users': 'Manage users'
    }

    def __init__(self):
        self.tokens = {}  # token_id -> {scopes, user_id, expires_at}

    def create_token(self, user_id: int, scopes: Set[str]) -> str:
        """Create token with specific scopes"""

        # Validate scopes
        invalid = scopes - set(self.AVAILABLE_SCOPES.keys())
        if invalid:
            raise ValueError(f'Invalid scopes: {invalid}')

        import secrets
        token_id = secrets.token_hex(16)

        self.tokens[token_id] = {
            'user_id': user_id,
            'scopes': scopes,
            'created': datetime.utcnow(),
            'expires_at': datetime.utcnow() + timedelta(days=30)
        }

        return token_id

    def has_scope(self, token_id: str, required_scope: str) -> bool:
        """Check if token has required scope"""

        if token_id not in self.tokens:
            return False

        token = self.tokens[token_id]

        if datetime.utcnow() >= token['expires_at']:
            return False

        return required_scope in token['scopes']

    def get_scopes(self, token_id: str) -> Set[str]:
        """Get token scopes"""

        if token_id not in self.tokens:
            return set()

        return self.tokens[token_id]['scopes']

# Usage with Flask
from flask import Flask, request, jsonify

app = Flask(__name__)
scope_manager = ScopedTokenManager()

@app.route('/admin/users', methods=['DELETE'])
def delete_user():
    """Protected endpoint requiring admin scope"""

    token = request.headers.get('Authorization', '').replace('Bearer ', '')

    if not scope_manager.has_scope(token, 'admin:delete'):
        return jsonify({'error': 'Insufficient permissions'}), 403

    # Proceed with delete
    return jsonify({'status': 'deleted'})

🔍 Rate Limiting Per API Key

from collections import defaultdict
from datetime import datetime, timedelta

class APIKeyRateLimiter:
    """Rate limit per API key"""

    def __init__(self, requests_per_hour: int = 1000):
        self.requests_per_hour = requests_per_hour
        self.requests = defaultdict(list)

    def is_allowed(self, api_key: str) -> bool:
        """Check if request is allowed"""

        now = datetime.utcnow()

        # Clean old requests (older than 1 hour)
        self.requests[api_key] = [
            req_time for req_time in self.requests[api_key]
            if now - req_time < timedelta(hours=1)
        ]

        # Check if under limit
        if len(self.requests[api_key]) < self.requests_per_hour:
            self.requests[api_key].append(now)
            return True

        return False

    def get_remaining(self, api_key: str) -> int:
        """Get remaining requests"""

        return max(0, self.requests_per_hour - len(self.requests[api_key]))

# Flask integration
from flask import Flask

app = Flask(__name__)
limiter = APIKeyRateLimiter(requests_per_hour=100)

@app.before_request
def check_rate_limit():
    """Check API key rate limit before each request"""

    api_key = request.headers.get('X-API-Key')

    if api_key and not limiter.is_allowed(api_key):
        remaining = limiter.get_remaining(api_key)
        return jsonify({
            'error': 'Rate limit exceeded',
            'remaining': remaining
        }), 429

    # Store remaining in response headers
    if api_key:
        g.remaining_requests = limiter.get_remaining(api_key)

✅ Key Takeaways

MethodSecurityUse Case
JWTMediumStateless, mobile apps
OAuth 2.0HighThird-party access
mTLSVery HighService-to-service
API KeysLowPublic APIs, simple
SAMLHighEnterprise SSO
Key RotationHighCompliance
ScopesHighFine-grained access

🔗 What's Next?

Learn advanced rate limiting and throttling strategies.

Next: Advanced Rate Limiting →


Ready for advanced challenges? Try advanced challenges


Resources

Python Docs

Ojasa Mirai

Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.

Learn Deep • Build Real • Verify Skills • Launch Forward

Courses

PythonFastapiReactJSCloud

© 2026 Ojasa Mirai. All rights reserved.

TwitterGitHubLinkedIn