
Python
Enterprise systems require sophisticated authentication. Master JWT tokens, OAuth 2.0 flows, SAML integration, mutual TLS, and implement industry-standard security patterns.
import jwt
from datetime import datetime, timedelta
from typing import Dict, Any
class JWTManager:
"""Manage JWT tokens for API authentication"""
def __init__(self, secret_key: str, algorithm: str = 'HS256'):
self.secret_key = secret_key
self.algorithm = algorithm
def create_token(self, data: Dict[str, Any], expires_in_hours: int = 24) -> str:
"""Create JWT token"""
payload = data.copy()
# Add expiration
payload['exp'] = datetime.utcnow() + timedelta(hours=expires_in_hours)
payload['iat'] = datetime.utcnow()
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> Dict[str, Any]:
"""Verify and decode JWT"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
return payload
except jwt.ExpiredSignatureError:
raise ValueError('Token expired')
except jwt.InvalidTokenError:
raise ValueError('Invalid token')
def refresh_token(self, token: str) -> str:
"""Create new token from existing one"""
payload = self.verify_token(token)
# Remove exp and iat
payload.pop('exp', None)
payload.pop('iat', None)
return self.create_token(payload)
# Usage
manager = JWTManager(secret_key='your-secret-key-here')
# Create token
token = manager.create_token({
'user_id': 123,
'username': 'alice',
'roles': ['user', 'admin']
})
# Verify token
payload = manager.verify_token(token)
print(f"User: {payload['username']}")
# Refresh token
new_token = manager.refresh_token(token)import requests
from datetime import datetime, timedelta
class OAuth2TokenManager:
"""Manage OAuth 2.0 tokens with refresh"""
def __init__(self, client_id: str, client_secret: str, token_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.access_token = None
self.refresh_token = None
self.expires_at = None
def request_tokens(self, code: str, redirect_uri: str) -> bool:
"""Exchange authorization code for tokens"""
data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': redirect_uri,
'client_id': self.client_id,
'client_secret': self.client_secret
}
response = requests.post(self.token_url, data=data)
if response.status_code != 200:
return False
token_data = response.json()
self.access_token = token_data['access_token']
self.refresh_token = token_data['refresh_token']
self.expires_at = datetime.utcnow() + timedelta(seconds=token_data['expires_in'])
return True
def refresh_access_token(self) -> bool:
"""Use refresh token to get new access token"""
data = {
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token,
'client_id': self.client_id,
'client_secret': self.client_secret
}
response = requests.post(self.token_url, data=data)
if response.status_code != 200:
return False
token_data = response.json()
self.access_token = token_data['access_token']
self.expires_at = datetime.utcnow() + timedelta(seconds=token_data['expires_in'])
return True
def get_valid_token(self) -> str:
"""Get access token, refreshing if needed"""
if datetime.utcnow() >= self.expires_at:
self.refresh_access_token()
return self.access_token
def make_request(self, method: str, url: str, **kwargs) -> requests.Response:
"""Make API request with valid token"""
headers = kwargs.pop('headers', {})
headers['Authorization'] = f'Bearer {self.get_valid_token()}'
return requests.request(method, url, headers=headers, **kwargs)
# Usage
oauth = OAuth2TokenManager(
client_id='your_client_id',
client_secret='your_client_secret',
token_url='https://provider.com/oauth/token'
)
# Step 1: User authorizes, get code
code = 'authorization_code_from_user'
oauth.request_tokens(code, 'https://yourapp.com/callback')
# Step 2: Make authenticated request
response = oauth.make_request('GET', 'https://api.provider.com/user')import requests
from requests.auth import HTTPCertAuth
def make_mtls_request(url, client_cert, client_key, ca_cert=None):
"""Make request with mutual TLS"""
# Verify both client and server certificates
session = requests.Session()
# Client certificate
session.cert = (client_cert, client_key)
# CA certificate for server verification
if ca_cert:
session.verify = ca_cert
else:
session.verify = True
response = session.get(url)
return response
# Usage
response = make_mtls_request(
'https://api.example.com/secure',
client_cert='/path/to/client.crt',
client_key='/path/to/client.key',
ca_cert='/path/to/ca.crt'
)
print(response.json())from typing import List, Optional
from datetime import datetime, timedelta
class APIKeyRotationManager:
"""Manage API key rotation for security"""
def __init__(self):
self.keys = {} # user_id -> {key, created, rotated_at}
def generate_key(self, user_id: int) -> str:
"""Generate new API key"""
import secrets
# Generate secure random key
key = secrets.token_urlsafe(32)
self.keys[user_id] = {
'key': key,
'created': datetime.utcnow(),
'rotated_at': datetime.utcnow(),
'active': True
}
return key
def rotate_key(self, user_id: int, old_key: str) -> str:
"""Rotate API key"""
if user_id not in self.keys:
raise ValueError('User not found')
if self.keys[user_id]['key'] != old_key:
raise ValueError('Invalid key')
# Generate new key
new_key = self.generate_key(user_id)
return new_key
def validate_key(self, user_id: int, key: str) -> bool:
"""Validate API key"""
if user_id not in self.keys:
return False
user_key = self.keys[user_id]
if not user_key['active']:
return False
# Check age - rotate if older than 90 days
if datetime.utcnow() - user_key['rotated_at'] > timedelta(days=90):
return False
return user_key['key'] == key
# Usage
manager = APIKeyRotationManager()
# Generate initial key
api_key = manager.generate_key(user_id=123)
# Validate key
if manager.validate_key(123, api_key):
print("Key valid")
# Rotate key after 90 days
new_key = manager.rotate_key(123, api_key)from typing import Set
class ScopedTokenManager:
"""Manage OAuth scopes for fine-grained permissions"""
AVAILABLE_SCOPES = {
'read:users': 'Read user data',
'write:users': 'Modify user data',
'read:posts': 'Read posts',
'write:posts': 'Create/edit posts',
'admin:delete': 'Delete resources',
'admin:users': 'Manage users'
}
def __init__(self):
self.tokens = {} # token_id -> {scopes, user_id, expires_at}
def create_token(self, user_id: int, scopes: Set[str]) -> str:
"""Create token with specific scopes"""
# Validate scopes
invalid = scopes - set(self.AVAILABLE_SCOPES.keys())
if invalid:
raise ValueError(f'Invalid scopes: {invalid}')
import secrets
token_id = secrets.token_hex(16)
self.tokens[token_id] = {
'user_id': user_id,
'scopes': scopes,
'created': datetime.utcnow(),
'expires_at': datetime.utcnow() + timedelta(days=30)
}
return token_id
def has_scope(self, token_id: str, required_scope: str) -> bool:
"""Check if token has required scope"""
if token_id not in self.tokens:
return False
token = self.tokens[token_id]
if datetime.utcnow() >= token['expires_at']:
return False
return required_scope in token['scopes']
def get_scopes(self, token_id: str) -> Set[str]:
"""Get token scopes"""
if token_id not in self.tokens:
return set()
return self.tokens[token_id]['scopes']
# Usage with Flask
from flask import Flask, request, jsonify
app = Flask(__name__)
scope_manager = ScopedTokenManager()
@app.route('/admin/users', methods=['DELETE'])
def delete_user():
"""Protected endpoint requiring admin scope"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not scope_manager.has_scope(token, 'admin:delete'):
return jsonify({'error': 'Insufficient permissions'}), 403
# Proceed with delete
return jsonify({'status': 'deleted'})from collections import defaultdict
from datetime import datetime, timedelta
class APIKeyRateLimiter:
"""Rate limit per API key"""
def __init__(self, requests_per_hour: int = 1000):
self.requests_per_hour = requests_per_hour
self.requests = defaultdict(list)
def is_allowed(self, api_key: str) -> bool:
"""Check if request is allowed"""
now = datetime.utcnow()
# Clean old requests (older than 1 hour)
self.requests[api_key] = [
req_time for req_time in self.requests[api_key]
if now - req_time < timedelta(hours=1)
]
# Check if under limit
if len(self.requests[api_key]) < self.requests_per_hour:
self.requests[api_key].append(now)
return True
return False
def get_remaining(self, api_key: str) -> int:
"""Get remaining requests"""
return max(0, self.requests_per_hour - len(self.requests[api_key]))
# Flask integration
from flask import Flask
app = Flask(__name__)
limiter = APIKeyRateLimiter(requests_per_hour=100)
@app.before_request
def check_rate_limit():
"""Check API key rate limit before each request"""
api_key = request.headers.get('X-API-Key')
if api_key and not limiter.is_allowed(api_key):
remaining = limiter.get_remaining(api_key)
return jsonify({
'error': 'Rate limit exceeded',
'remaining': remaining
}), 429
# Store remaining in response headers
if api_key:
g.remaining_requests = limiter.get_remaining(api_key)| Method | Security | Use Case |
|---|---|---|
| JWT | Medium | Stateless, mobile apps |
| OAuth 2.0 | High | Third-party access |
| mTLS | Very High | Service-to-service |
| API Keys | Low | Public APIs, simple |
| SAML | High | Enterprise SSO |
| Key Rotation | High | Compliance |
| Scopes | High | Fine-grained access |
Learn advanced rate limiting and throttling strategies.
Next: Advanced Rate Limiting →
Ready for advanced challenges? Try advanced challenges
Resources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep • Build Real • Verify Skills • Launch Forward