Django Integration
Django Integration
This guide covers how to integrate Skycloak authentication into Django applications using OAuth2/OIDC libraries and Django best practices.
Prerequisites
- Django 4.2+ or Django 5.x application (recommended)
- Python 3.10+ (Python 3.12 recommended for new projects)
- Skycloak cluster with configured realm and client
- Basic understanding of Django authentication system
ℹ️
Version Compatibility
- mozilla-django-oidc v5.0+ supports Django 4.2, 5.0, and 5.1
- mozilla-django-oidc v4.x supports Django 3.2 and 4.x
- Python 3.10+ is required for mozilla-django-oidc v5.0+
Quick Start
1. Install Dependencies
pip install mozilla-django-oidc python-jose requestsOr using django-allauth with OIDC support:
pip install django-allauth[socialaccount] pyjwt cryptography2. Configure Django Settings
Add to your settings.py:
# settings.py
import os
from django.urls import reverse_lazy
# Add mozilla_django_oidc to installed apps
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'mozilla_django_oidc', # Add this
# your apps...
]
# Add OIDC authentication backend
AUTHENTICATION_BACKENDS = (
'mozilla_django_oidc.auth.OIDCAuthenticationBackend',
'django.contrib.auth.backends.ModelBackend', # Fallback
)
# OIDC Configuration
OIDC_RP_CLIENT_ID = os.environ.get('SKYCLOAK_CLIENT_ID', 'your-django-app')
OIDC_RP_CLIENT_SECRET = os.environ.get('SKYCLOAK_CLIENT_SECRET', 'your-secret')
OIDC_OP_AUTHORIZATION_ENDPOINT = 'https://your-cluster-id.app.skycloak.io/realms/your-realm/protocol/openid-connect/auth'
OIDC_OP_TOKEN_ENDPOINT = 'https://your-cluster-id.app.skycloak.io/realms/your-realm/protocol/openid-connect/token'
OIDC_OP_USER_ENDPOINT = 'https://your-cluster-id.app.skycloak.io/realms/your-realm/protocol/openid-connect/userinfo'
OIDC_OP_JWKS_ENDPOINT = 'https://your-cluster-id.app.skycloak.io/realms/your-realm/protocol/openid-connect/certs'
OIDC_OP_LOGOUT_ENDPOINT = 'https://your-cluster-id.app.skycloak.io/realms/your-realm/protocol/openid-connect/logout'
# Configure scopes
OIDC_RP_SCOPES = 'openid profile email'
# Configure sign algorithm
OIDC_RP_SIGN_ALGO = 'RS256'
# Enable refresh tokens
OIDC_RENEW_ID_TOKEN_EXPIRY_SECONDS = 15 * 60 # 15 minutes
# Login URLs
LOGIN_REDIRECT_URL = reverse_lazy('home')
LOGOUT_REDIRECT_URL = reverse_lazy('index')3. Configure URLs
# urls.py
from django.urls import path, include
from django.contrib import admin
from . import views
urlpatterns = [
path('admin/', admin.site.urls),
path('oidc/', include('mozilla_django_oidc.urls')),
path('', views.index, name='index'),
path('home/', views.home, name='home'),
path('logout/', views.logout_view, name='logout'),
]4. Create Views
# views.py
from django.shortcuts import render, redirect
from django.contrib.auth.decorators import login_required
from django.contrib.auth import logout
from django.conf import settings
import urllib.parse
def index(request):
return render(request, 'index.html')
@login_required
def home(request):
return render(request, 'home.html', {'user': request.user})
def logout_view(request):
"""Custom logout that redirects to Keycloak logout."""
logout(request)
# Build Keycloak logout URL
logout_url = settings.OIDC_OP_LOGOUT_ENDPOINT
redirect_url = request.build_absolute_uri('/')
return redirect(f"{logout_url}?redirect_uri={urllib.parse.quote(redirect_url)}")5. Create Templates
<!-- templates/base.html -->
<!DOCTYPE html>
<html>
<head>
<title>Django Skycloak App</title>
</head>
<body>
<nav>
{% if user.is_authenticated %}
<span>Welcome, {{ user.username }}!</span>
<a href="{% url 'logout' %}">Logout</a>
{% else %}
<a href="{% url 'oidc_authentication_init' %}">Login with Skycloak</a>
{% endif %}
</nav>
{% block content %}{% endblock %}
</body>
</html>Advanced Authentication
Custom User Model Integration
# models.py
from django.contrib.auth.models import AbstractUser
from django.db import models
class SkycloakUser(AbstractUser):
keycloak_id = models.CharField(max_length=255, unique=True, null=True)
realm_roles = models.JSONField(default=list)
client_roles = models.JSONField(default=dict)
groups = models.JSONField(default=list)
attributes = models.JSONField(default=dict)
class Meta:
db_table = 'skycloak_users'Custom Authentication Backend
# auth_backends.py
from mozilla_django_oidc.auth import OIDCAuthenticationBackend
from django.contrib.auth import get_user_model
from django.core.exceptions import SuspiciousOperation
UserModel = get_user_model()
class SkycloakOIDCAuthenticationBackend(OIDCAuthenticationBackend):
"""Custom backend to handle Skycloak-specific claims."""
def create_user(self, claims):
"""Create a user with Skycloak claims."""
email = claims.get('email')
username = claims.get('preferred_username', email)
user = UserModel.objects.create_user(
username=username,
email=email,
first_name=claims.get('given_name', ''),
last_name=claims.get('family_name', ''),
keycloak_id=claims.get('sub'),
)
self.update_user_from_claims(user, claims)
return user
def update_user(self, user, claims):
"""Update user with latest claims from Skycloak."""
user.first_name = claims.get('given_name', '')
user.last_name = claims.get('family_name', '')
user.email = claims.get('email', '')
self.update_user_from_claims(user, claims)
user.save()
return user
def update_user_from_claims(self, user, claims):
"""Extract and save Skycloak-specific claims."""
# Extract realm roles
user.realm_roles = claims.get('realm_access', {}).get('roles', [])
# Extract client roles
user.client_roles = claims.get('resource_access', {})
# Extract groups
user.groups = claims.get('groups', [])
# Extract custom attributes
user.attributes = claims.get('attributes', {})
user.save()
def filter_users_by_claims(self, claims):
"""Find user by Keycloak ID or email."""
keycloak_id = claims.get('sub')
email = claims.get('email')
if keycloak_id:
try:
return UserModel.objects.filter(keycloak_id=keycloak_id)
except UserModel.DoesNotExist:
pass
return UserModel.objects.filter(email__iexact=email)
def verify_claims(self, claims):
"""Verify the claims are valid."""
if 'email_verified' in claims and not claims.get('email_verified'):
raise SuspiciousOperation('Email not verified')
return TrueRole-Based Access Control
# decorators.py
from functools import wraps
from django.core.exceptions import PermissionDenied
from django.contrib.auth.decorators import login_required
def require_realm_role(*roles):
"""Decorator to check realm roles."""
def decorator(view_func):
@wraps(view_func)
@login_required
def wrapped_view(request, *args, **kwargs):
user_roles = getattr(request.user, 'realm_roles', [])
if not any(role in user_roles for role in roles):
raise PermissionDenied(
f"User must have one of these roles: {', '.join(roles)}"
)
return view_func(request, *args, **kwargs)
return wrapped_view
return decorator
def require_client_role(client_id, *roles):
"""Decorator to check client-specific roles."""
def decorator(view_func):
@wraps(view_func)
@login_required
def wrapped_view(request, *args, **kwargs):
client_roles = getattr(request.user, 'client_roles', {})
user_roles = client_roles.get(client_id, {}).get('roles', [])
if not any(role in user_roles for role in roles):
raise PermissionDenied(
f"User must have one of these client roles: {', '.join(roles)}"
)
return view_func(request, *args, **kwargs)
return wrapped_view
return decorator
# Usage
@require_realm_role('admin', 'manager')
def admin_view(request):
return render(request, 'admin.html')
@require_client_role('my-app', 'editor')
def editor_view(request):
return render(request, 'editor.html')Template Tags for Roles
# templatetags/skycloak_tags.py
from django import template
from django.contrib.auth import get_user_model
register = template.Library()
@register.filter
def has_realm_role(user, role):
"""Check if user has a specific realm role."""
if not hasattr(user, 'realm_roles'):
return False
return role in user.realm_roles
@register.filter
def has_any_realm_role(user, roles):
"""Check if user has any of the specified roles."""
if not hasattr(user, 'realm_roles'):
return False
role_list = [r.strip() for r in roles.split(',')]
return any(role in user.realm_roles for role in role_list)
@register.filter
def has_client_role(user, client_role):
"""Check if user has a specific client role."""
if not hasattr(user, 'client_roles'):
return False
client_id, role = client_role.split(':')
client_roles = user.client_roles.get(client_id, {}).get('roles', [])
return role in client_roles
# Usage in templates
# {% load skycloak_tags %}
# {% if user|has_realm_role:"admin" %}
# <a href="/admin">Admin Panel</a>
# {% endif %}API Integration
Protected API Views
# api/views.py
from rest_framework import viewsets, permissions
from rest_framework.decorators import action
from rest_framework.response import Response
from mozilla_django_oidc.contrib.drf import OIDCAuthentication
class SkycloakPermission(permissions.BasePermission):
"""Custom permission class for Skycloak roles."""
def has_permission(self, request, view):
# Check if user is authenticated
if not request.user.is_authenticated:
return False
# Get required roles from view
required_roles = getattr(view, 'required_roles', [])
if not required_roles:
return True
# Check user roles
user_roles = getattr(request.user, 'realm_roles', [])
return any(role in user_roles for role in required_roles)
class ProtectedViewSet(viewsets.ModelViewSet):
authentication_classes = [OIDCAuthentication]
permission_classes = [SkycloakPermission]
required_roles = ['api_user']
@action(detail=False, methods=['get'])
def me(self, request):
"""Get current user info."""
return Response({
'username': request.user.username,
'email': request.user.email,
'roles': getattr(request.user, 'realm_roles', []),
'groups': getattr(request.user, 'groups', []),
})Token Validation Middleware
# middleware.py
import requests
from django.http import JsonResponse
from django.utils.deprecation import MiddlewareMixin
from jose import jwt, JWTError
from django.conf import settings
import logging
logger = logging.getLogger(__name__)
class SkycloakTokenMiddleware(MiddlewareMixin):
"""Middleware to validate Skycloak tokens for API requests."""
def __init__(self, get_response):
self.get_response = get_response
self._keys = None
@property
def keys(self):
"""Lazy load and cache JWKS keys."""
if self._keys is None:
response = requests.get(settings.OIDC_OP_JWKS_ENDPOINT)
self._keys = response.json()
return self._keys
def process_request(self, request):
# Only validate API requests
if not request.path.startswith('/api/'):
return None
# Extract token from Authorization header
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return None
token = auth_header.split(' ')[1]
try:
# Decode and validate token
payload = jwt.decode(
token,
self.keys,
algorithms=['RS256'],
audience=settings.OIDC_RP_CLIENT_ID,
issuer=f"{settings.OIDC_OP_AUTHORIZATION_ENDPOINT.rsplit('/protocol', 1)[0]}"
)
# Attach claims to request
request.token_claims = payload
except JWTError as e:
logger.error(f"Token validation failed: {e}")
return JsonResponse({'error': 'Invalid token'}, status=401)
return NoneSession Management
Extended Session Handling
# session_manager.py
from django.contrib.sessions.backends.db import SessionStore
from django.utils import timezone
from datetime import timedelta
import logging
logger = logging.getLogger(__name__)
class SkycloakSessionManager:
"""Manage sessions with Skycloak token lifecycle."""
@staticmethod
def refresh_session(request):
"""Refresh session when token is renewed."""
if hasattr(request, 'user') and request.user.is_authenticated:
# Extend session to match token expiry
request.session.set_expiry(timedelta(hours=8))
# Store token refresh time
request.session['last_token_refresh'] = timezone.now().isoformat()
logger.info(f"Session refreshed for user: {request.user.username}")
@staticmethod
def check_session_validity(request):
"""Check if session needs refresh."""
if not request.user.is_authenticated:
return False
last_refresh = request.session.get('last_token_refresh')
if not last_refresh:
return True
# Check if more than 10 minutes since last refresh
last_refresh_time = timezone.datetime.fromisoformat(last_refresh)
time_since_refresh = timezone.now() - last_refresh_time
return time_since_refresh > timedelta(minutes=10)
@staticmethod
def end_session(request):
"""Properly end session and cleanup."""
if hasattr(request, 'session'):
request.session.flush()
# Additional cleanup if needed
if hasattr(request, 'user') and request.user.is_authenticated:
logger.info(f"Session ended for user: {request.user.username}")Single Sign-Out Support
# sso_handlers.py
from django.dispatch import receiver
from django.contrib.auth.signals import user_logged_out
from django.core.cache import cache
import requests
import logging
logger = logging.getLogger(__name__)
@receiver(user_logged_out)
def handle_logout(sender, request, user, **kwargs):
"""Handle logout and notify Skycloak."""
if user and hasattr(user, 'keycloak_id'):
# Store logout state in cache
cache.set(f"logout_{user.keycloak_id}", True, 300) # 5 minutes
# Optionally notify other services
notify_services_of_logout(user.keycloak_id)
def notify_services_of_logout(keycloak_id):
"""Notify other services about user logout."""
# Implementation depends on your architecture
passTesting
Test Fixtures
# tests/fixtures.py
import json
from unittest.mock import Mock
from django.contrib.auth import get_user_model
UserModel = get_user_model()
def create_test_user_with_roles(username='testuser', roles=None, client_roles=None):
"""Create a test user with Skycloak attributes."""
user = UserModel.objects.create_user(
username=username,
email=f'{username}@example.com',
keycloak_id=f'test-{username}-123',
realm_roles=roles or ['user'],
client_roles=client_roles or {},
)
return user
def mock_oidc_claims(overrides=None):
"""Generate mock OIDC claims."""
claims = {
'sub': 'test-user-123',
'preferred_username': 'testuser',
'email': '[email protected]',
'email_verified': True,
'given_name': 'Test',
'family_name': 'User',
'realm_access': {
'roles': ['user', 'admin']
},
'resource_access': {
'my-app': {
'roles': ['editor']
}
},
'groups': ['/admins', '/editors'],
}
if overrides:
claims.update(overrides)
return claimsView Tests
# tests/test_views.py
from django.test import TestCase, Client
from django.urls import reverse
from unittest.mock import patch, Mock
from .fixtures import create_test_user_with_roles, mock_oidc_claims
class SkycloakViewTests(TestCase):
def setUp(self):
self.client = Client()
self.user = create_test_user_with_roles('testuser', roles=['admin'])
def test_protected_view_requires_login(self):
"""Test that protected views require authentication."""
response = self.client.get(reverse('home'))
self.assertEqual(response.status_code, 302)
self.assertIn('oidc_authentication_init', response.url)
def test_role_protected_view(self):
"""Test role-based access control."""
self.client.force_login(self.user)
# User has admin role
response = self.client.get(reverse('admin_view'))
self.assertEqual(response.status_code, 200)
# Remove admin role
self.user.realm_roles = ['user']
self.user.save()
response = self.client.get(reverse('admin_view'))
self.assertEqual(response.status_code, 403)
@patch('mozilla_django_oidc.auth.OIDCAuthenticationBackend.verify_token')
@patch('mozilla_django_oidc.auth.OIDCAuthenticationBackend.get_userinfo')
def test_oidc_login_flow(self, mock_userinfo, mock_verify):
"""Test complete OIDC login flow."""
mock_verify.return_value = True
mock_userinfo.return_value = mock_oidc_claims()
# Simulate OIDC callback
response = self.client.get(
reverse('oidc_authentication_callback'),
{'code': 'test-code', 'state': 'test-state'}
)
self.assertEqual(response.status_code, 302)
self.assertIn('home', response.url)API Tests
# tests/test_api.py
from rest_framework.test import APITestCase
from django.urls import reverse
from .fixtures import create_test_user_with_roles
class SkycloakAPITests(APITestCase):
def setUp(self):
self.user = create_test_user_with_roles('apiuser', roles=['api_user'])
def test_api_requires_authentication(self):
"""Test API requires authentication."""
response = self.client.get('/api/protected/')
self.assertEqual(response.status_code, 401)
def test_api_with_valid_user(self):
"""Test API access with authenticated user."""
self.client.force_authenticate(user=self.user)
response = self.client.get('/api/protected/me/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['username'], 'apiuser')
self.assertIn('api_user', response.data['roles'])Production Considerations
Environment Configuration
# settings/production.py
import os
from .base import *
# Security settings
SECURE_SSL_REDIRECT = True
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
SECURE_BROWSER_XSS_FILTER = True
SECURE_CONTENT_TYPE_NOSNIFF = True
# OIDC Production settings
OIDC_USE_NONCE = True
OIDC_NONCE_SIZE = 32
# Session settings
SESSION_COOKIE_AGE = 28800 # 8 hours
SESSION_SAVE_EVERY_REQUEST = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
# Cache configuration for JWKS
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379/1'),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
},
'KEY_PREFIX': 'skycloak_django',
'TIMEOUT': 300, # 5 minutes
}
}
# Logging
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/django/skycloak.log',
'maxBytes': 1024 * 1024 * 15, # 15MB
'backupCount': 10,
},
},
'loggers': {
'mozilla_django_oidc': {
'handlers': ['file'],
'level': 'INFO',
},
'skycloak': {
'handlers': ['file'],
'level': 'INFO',
},
},
}Health Check Endpoint
# health/views.py
from django.http import JsonResponse
from django.conf import settings
import requests
from django.core.cache import cache
def health_check(request):
"""Health check endpoint for monitoring."""
health_status = {
'status': 'healthy',
'checks': {}
}
# Check database
try:
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
health_status['checks']['database'] = 'ok'
except Exception as e:
health_status['checks']['database'] = f'error: {str(e)}'
health_status['status'] = 'unhealthy'
# Check Skycloak connectivity
try:
response = requests.get(
settings.OIDC_OP_JWKS_ENDPOINT,
timeout=5
)
response.raise_for_status()
health_status['checks']['skycloak'] = 'ok'
except Exception as e:
health_status['checks']['skycloak'] = f'error: {str(e)}'
health_status['status'] = 'degraded'
# Check cache
try:
cache.set('health_check', 'ok', 30)
if cache.get('health_check') == 'ok':
health_status['checks']['cache'] = 'ok'
else:
raise Exception('Cache write/read failed')
except Exception as e:
health_status['checks']['cache'] = f'error: {str(e)}'
health_status['status'] = 'degraded'
status_code = 200 if health_status['status'] == 'healthy' else 503
return JsonResponse(health_status, status=status_code)Troubleshooting
Common Issues
-
CSRF Token Mismatch
- Ensure
CSRF_COOKIE_SAMESITEis properly configured - Check that AJAX requests include CSRF token
- Verify session cookies are being set correctly
- Ensure
-
Redirect URI Mismatch
- Double-check redirect URIs in Skycloak client configuration
- Ensure Django’s
ALLOWED_HOSTSincludes all valid domains - Check for HTTP/HTTPS mismatches
-
Token Expiration Issues
- Configure
OIDC_RENEW_ID_TOKEN_EXPIRY_SECONDSappropriately - Implement token refresh logic in middleware
- Monitor token expiration in logs
- Configure
Debug Logging
# Enable detailed logging in development
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
},
},
'loggers': {
'mozilla_django_oidc': {
'handlers': ['console'],
'level': 'DEBUG',
},
'django.request': {
'handlers': ['console'],
'level': 'DEBUG',
},
},
}Performance Monitoring
# monitoring.py
from django.core.management.base import BaseCommand
from django.contrib.sessions.models import Session
from django.utils import timezone
from datetime import timedelta
class Command(BaseCommand):
help = 'Monitor Skycloak integration performance'
def handle(self, *args, **options):
# Check active sessions
active_sessions = Session.objects.filter(
expire_date__gt=timezone.now()
).count()
# Check recent logins
recent_logins = UserModel.objects.filter(
last_login__gte=timezone.now() - timedelta(hours=1)
).count()
self.stdout.write(
f"Active sessions: {active_sessions}, "
f"Recent logins: {recent_logins}"
)