Django Integration

Django Integration

This guide covers how to integrate Skycloak authentication into Django applications using OAuth2/OIDC libraries and Django best practices.

Prerequisites

  • Django 4.2+ or Django 5.x application (recommended)
  • Python 3.10+ (Python 3.12 recommended for new projects)
  • Skycloak cluster with configured realm and client
  • Basic understanding of Django authentication system
ℹ️

Version Compatibility

  • mozilla-django-oidc v5.0+ supports Django 4.2, 5.0, and 5.1
  • mozilla-django-oidc v4.x supports Django 3.2 and 4.x
  • Python 3.10+ is required for mozilla-django-oidc v5.0+

Quick Start

1. Install Dependencies

pip install mozilla-django-oidc python-jose requests

Or using django-allauth with OIDC support:

pip install django-allauth[socialaccount] pyjwt cryptography

2. Configure Django Settings

Add to your settings.py:

# settings.py
import os
from django.urls import reverse_lazy

# Add mozilla_django_oidc to installed apps
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'mozilla_django_oidc',  # Add this
    # your apps...
]

# Add OIDC authentication backend
AUTHENTICATION_BACKENDS = (
    'mozilla_django_oidc.auth.OIDCAuthenticationBackend',
    'django.contrib.auth.backends.ModelBackend',  # Fallback
)

# OIDC Configuration
OIDC_RP_CLIENT_ID = os.environ.get('SKYCLOAK_CLIENT_ID', 'your-django-app')
OIDC_RP_CLIENT_SECRET = os.environ.get('SKYCLOAK_CLIENT_SECRET', 'your-secret')
OIDC_OP_AUTHORIZATION_ENDPOINT = 'https://your-cluster-id.app.skycloak.io/realms/your-realm/protocol/openid-connect/auth'
OIDC_OP_TOKEN_ENDPOINT = 'https://your-cluster-id.app.skycloak.io/realms/your-realm/protocol/openid-connect/token'
OIDC_OP_USER_ENDPOINT = 'https://your-cluster-id.app.skycloak.io/realms/your-realm/protocol/openid-connect/userinfo'
OIDC_OP_JWKS_ENDPOINT = 'https://your-cluster-id.app.skycloak.io/realms/your-realm/protocol/openid-connect/certs'
OIDC_OP_LOGOUT_ENDPOINT = 'https://your-cluster-id.app.skycloak.io/realms/your-realm/protocol/openid-connect/logout'

# Configure scopes
OIDC_RP_SCOPES = 'openid profile email'

# Configure sign algorithm
OIDC_RP_SIGN_ALGO = 'RS256'

# Enable refresh tokens
OIDC_RENEW_ID_TOKEN_EXPIRY_SECONDS = 15 * 60  # 15 minutes

# Login URLs
LOGIN_REDIRECT_URL = reverse_lazy('home')
LOGOUT_REDIRECT_URL = reverse_lazy('index')

3. Configure URLs

# urls.py
from django.urls import path, include
from django.contrib import admin
from . import views

urlpatterns = [
    path('admin/', admin.site.urls),
    path('oidc/', include('mozilla_django_oidc.urls')),
    path('', views.index, name='index'),
    path('home/', views.home, name='home'),
    path('logout/', views.logout_view, name='logout'),
]

4. Create Views

# views.py
from django.shortcuts import render, redirect
from django.contrib.auth.decorators import login_required
from django.contrib.auth import logout
from django.conf import settings
import urllib.parse

def index(request):
    return render(request, 'index.html')

@login_required
def home(request):
    return render(request, 'home.html', {'user': request.user})

def logout_view(request):
    """Custom logout that redirects to Keycloak logout."""
    logout(request)
    
    # Build Keycloak logout URL
    logout_url = settings.OIDC_OP_LOGOUT_ENDPOINT
    redirect_url = request.build_absolute_uri('/')
    
    return redirect(f"{logout_url}?redirect_uri={urllib.parse.quote(redirect_url)}")

5. Create Templates

<!-- templates/base.html -->
<!DOCTYPE html>
<html>
<head>
    <title>Django Skycloak App</title>
</head>
<body>
    <nav>
        {% if user.is_authenticated %}
            <span>Welcome, {{ user.username }}!</span>
            <a href="{% url 'logout' %}">Logout</a>
        {% else %}
            <a href="{% url 'oidc_authentication_init' %}">Login with Skycloak</a>
        {% endif %}
    </nav>
    
    {% block content %}{% endblock %}
</body>
</html>

Advanced Authentication

Custom User Model Integration

# models.py
from django.contrib.auth.models import AbstractUser
from django.db import models

class SkycloakUser(AbstractUser):
    keycloak_id = models.CharField(max_length=255, unique=True, null=True)
    realm_roles = models.JSONField(default=list)
    client_roles = models.JSONField(default=dict)
    groups = models.JSONField(default=list)
    attributes = models.JSONField(default=dict)
    
    class Meta:
        db_table = 'skycloak_users'

Custom Authentication Backend

# auth_backends.py
from mozilla_django_oidc.auth import OIDCAuthenticationBackend
from django.contrib.auth import get_user_model
from django.core.exceptions import SuspiciousOperation

UserModel = get_user_model()

class SkycloakOIDCAuthenticationBackend(OIDCAuthenticationBackend):
    """Custom backend to handle Skycloak-specific claims."""
    
    def create_user(self, claims):
        """Create a user with Skycloak claims."""
        email = claims.get('email')
        username = claims.get('preferred_username', email)
        
        user = UserModel.objects.create_user(
            username=username,
            email=email,
            first_name=claims.get('given_name', ''),
            last_name=claims.get('family_name', ''),
            keycloak_id=claims.get('sub'),
        )
        
        self.update_user_from_claims(user, claims)
        return user
    
    def update_user(self, user, claims):
        """Update user with latest claims from Skycloak."""
        user.first_name = claims.get('given_name', '')
        user.last_name = claims.get('family_name', '')
        user.email = claims.get('email', '')
        
        self.update_user_from_claims(user, claims)
        user.save()
        return user
    
    def update_user_from_claims(self, user, claims):
        """Extract and save Skycloak-specific claims."""
        # Extract realm roles
        user.realm_roles = claims.get('realm_access', {}).get('roles', [])
        
        # Extract client roles
        user.client_roles = claims.get('resource_access', {})
        
        # Extract groups
        user.groups = claims.get('groups', [])
        
        # Extract custom attributes
        user.attributes = claims.get('attributes', {})
        
        user.save()
    
    def filter_users_by_claims(self, claims):
        """Find user by Keycloak ID or email."""
        keycloak_id = claims.get('sub')
        email = claims.get('email')
        
        if keycloak_id:
            try:
                return UserModel.objects.filter(keycloak_id=keycloak_id)
            except UserModel.DoesNotExist:
                pass
        
        return UserModel.objects.filter(email__iexact=email)
    
    def verify_claims(self, claims):
        """Verify the claims are valid."""
        if 'email_verified' in claims and not claims.get('email_verified'):
            raise SuspiciousOperation('Email not verified')
        return True

Role-Based Access Control

# decorators.py
from functools import wraps
from django.core.exceptions import PermissionDenied
from django.contrib.auth.decorators import login_required

def require_realm_role(*roles):
    """Decorator to check realm roles."""
    def decorator(view_func):
        @wraps(view_func)
        @login_required
        def wrapped_view(request, *args, **kwargs):
            user_roles = getattr(request.user, 'realm_roles', [])
            
            if not any(role in user_roles for role in roles):
                raise PermissionDenied(
                    f"User must have one of these roles: {', '.join(roles)}"
                )
            
            return view_func(request, *args, **kwargs)
        return wrapped_view
    return decorator

def require_client_role(client_id, *roles):
    """Decorator to check client-specific roles."""
    def decorator(view_func):
        @wraps(view_func)
        @login_required
        def wrapped_view(request, *args, **kwargs):
            client_roles = getattr(request.user, 'client_roles', {})
            user_roles = client_roles.get(client_id, {}).get('roles', [])
            
            if not any(role in user_roles for role in roles):
                raise PermissionDenied(
                    f"User must have one of these client roles: {', '.join(roles)}"
                )
            
            return view_func(request, *args, **kwargs)
        return wrapped_view
    return decorator

# Usage
@require_realm_role('admin', 'manager')
def admin_view(request):
    return render(request, 'admin.html')

@require_client_role('my-app', 'editor')
def editor_view(request):
    return render(request, 'editor.html')

Template Tags for Roles

# templatetags/skycloak_tags.py
from django import template
from django.contrib.auth import get_user_model

register = template.Library()

@register.filter
def has_realm_role(user, role):
    """Check if user has a specific realm role."""
    if not hasattr(user, 'realm_roles'):
        return False
    return role in user.realm_roles

@register.filter
def has_any_realm_role(user, roles):
    """Check if user has any of the specified roles."""
    if not hasattr(user, 'realm_roles'):
        return False
    role_list = [r.strip() for r in roles.split(',')]
    return any(role in user.realm_roles for role in role_list)

@register.filter
def has_client_role(user, client_role):
    """Check if user has a specific client role."""
    if not hasattr(user, 'client_roles'):
        return False
    
    client_id, role = client_role.split(':')
    client_roles = user.client_roles.get(client_id, {}).get('roles', [])
    return role in client_roles

# Usage in templates
# {% load skycloak_tags %}
# {% if user|has_realm_role:"admin" %}
#     <a href="/admin">Admin Panel</a>
# {% endif %}

API Integration

Protected API Views

# api/views.py
from rest_framework import viewsets, permissions
from rest_framework.decorators import action
from rest_framework.response import Response
from mozilla_django_oidc.contrib.drf import OIDCAuthentication

class SkycloakPermission(permissions.BasePermission):
    """Custom permission class for Skycloak roles."""
    
    def has_permission(self, request, view):
        # Check if user is authenticated
        if not request.user.is_authenticated:
            return False
        
        # Get required roles from view
        required_roles = getattr(view, 'required_roles', [])
        if not required_roles:
            return True
        
        # Check user roles
        user_roles = getattr(request.user, 'realm_roles', [])
        return any(role in user_roles for role in required_roles)

class ProtectedViewSet(viewsets.ModelViewSet):
    authentication_classes = [OIDCAuthentication]
    permission_classes = [SkycloakPermission]
    required_roles = ['api_user']
    
    @action(detail=False, methods=['get'])
    def me(self, request):
        """Get current user info."""
        return Response({
            'username': request.user.username,
            'email': request.user.email,
            'roles': getattr(request.user, 'realm_roles', []),
            'groups': getattr(request.user, 'groups', []),
        })

Token Validation Middleware

# middleware.py
import requests
from django.http import JsonResponse
from django.utils.deprecation import MiddlewareMixin
from jose import jwt, JWTError
from django.conf import settings
import logging

logger = logging.getLogger(__name__)

class SkycloakTokenMiddleware(MiddlewareMixin):
    """Middleware to validate Skycloak tokens for API requests."""
    
    def __init__(self, get_response):
        self.get_response = get_response
        self._keys = None
    
    @property
    def keys(self):
        """Lazy load and cache JWKS keys."""
        if self._keys is None:
            response = requests.get(settings.OIDC_OP_JWKS_ENDPOINT)
            self._keys = response.json()
        return self._keys
    
    def process_request(self, request):
        # Only validate API requests
        if not request.path.startswith('/api/'):
            return None
        
        # Extract token from Authorization header
        auth_header = request.META.get('HTTP_AUTHORIZATION', '')
        if not auth_header.startswith('Bearer '):
            return None
        
        token = auth_header.split(' ')[1]
        
        try:
            # Decode and validate token
            payload = jwt.decode(
                token,
                self.keys,
                algorithms=['RS256'],
                audience=settings.OIDC_RP_CLIENT_ID,
                issuer=f"{settings.OIDC_OP_AUTHORIZATION_ENDPOINT.rsplit('/protocol', 1)[0]}"
            )
            
            # Attach claims to request
            request.token_claims = payload
            
        except JWTError as e:
            logger.error(f"Token validation failed: {e}")
            return JsonResponse({'error': 'Invalid token'}, status=401)
        
        return None

Session Management

Extended Session Handling

# session_manager.py
from django.contrib.sessions.backends.db import SessionStore
from django.utils import timezone
from datetime import timedelta
import logging

logger = logging.getLogger(__name__)

class SkycloakSessionManager:
    """Manage sessions with Skycloak token lifecycle."""
    
    @staticmethod
    def refresh_session(request):
        """Refresh session when token is renewed."""
        if hasattr(request, 'user') and request.user.is_authenticated:
            # Extend session to match token expiry
            request.session.set_expiry(timedelta(hours=8))
            
            # Store token refresh time
            request.session['last_token_refresh'] = timezone.now().isoformat()
            
            logger.info(f"Session refreshed for user: {request.user.username}")
    
    @staticmethod
    def check_session_validity(request):
        """Check if session needs refresh."""
        if not request.user.is_authenticated:
            return False
        
        last_refresh = request.session.get('last_token_refresh')
        if not last_refresh:
            return True
        
        # Check if more than 10 minutes since last refresh
        last_refresh_time = timezone.datetime.fromisoformat(last_refresh)
        time_since_refresh = timezone.now() - last_refresh_time
        
        return time_since_refresh > timedelta(minutes=10)
    
    @staticmethod
    def end_session(request):
        """Properly end session and cleanup."""
        if hasattr(request, 'session'):
            request.session.flush()
        
        # Additional cleanup if needed
        if hasattr(request, 'user') and request.user.is_authenticated:
            logger.info(f"Session ended for user: {request.user.username}")

Single Sign-Out Support

# sso_handlers.py
from django.dispatch import receiver
from django.contrib.auth.signals import user_logged_out
from django.core.cache import cache
import requests
import logging

logger = logging.getLogger(__name__)

@receiver(user_logged_out)
def handle_logout(sender, request, user, **kwargs):
    """Handle logout and notify Skycloak."""
    if user and hasattr(user, 'keycloak_id'):
        # Store logout state in cache
        cache.set(f"logout_{user.keycloak_id}", True, 300)  # 5 minutes
        
        # Optionally notify other services
        notify_services_of_logout(user.keycloak_id)

def notify_services_of_logout(keycloak_id):
    """Notify other services about user logout."""
    # Implementation depends on your architecture
    pass

Testing

Test Fixtures

# tests/fixtures.py
import json
from unittest.mock import Mock
from django.contrib.auth import get_user_model

UserModel = get_user_model()

def create_test_user_with_roles(username='testuser', roles=None, client_roles=None):
    """Create a test user with Skycloak attributes."""
    user = UserModel.objects.create_user(
        username=username,
        email=f'{username}@example.com',
        keycloak_id=f'test-{username}-123',
        realm_roles=roles or ['user'],
        client_roles=client_roles or {},
    )
    return user

def mock_oidc_claims(overrides=None):
    """Generate mock OIDC claims."""
    claims = {
        'sub': 'test-user-123',
        'preferred_username': 'testuser',
        'email': '[email protected]',
        'email_verified': True,
        'given_name': 'Test',
        'family_name': 'User',
        'realm_access': {
            'roles': ['user', 'admin']
        },
        'resource_access': {
            'my-app': {
                'roles': ['editor']
            }
        },
        'groups': ['/admins', '/editors'],
    }
    
    if overrides:
        claims.update(overrides)
    
    return claims

View Tests

# tests/test_views.py
from django.test import TestCase, Client
from django.urls import reverse
from unittest.mock import patch, Mock
from .fixtures import create_test_user_with_roles, mock_oidc_claims

class SkycloakViewTests(TestCase):
    def setUp(self):
        self.client = Client()
        self.user = create_test_user_with_roles('testuser', roles=['admin'])
    
    def test_protected_view_requires_login(self):
        """Test that protected views require authentication."""
        response = self.client.get(reverse('home'))
        self.assertEqual(response.status_code, 302)
        self.assertIn('oidc_authentication_init', response.url)
    
    def test_role_protected_view(self):
        """Test role-based access control."""
        self.client.force_login(self.user)
        
        # User has admin role
        response = self.client.get(reverse('admin_view'))
        self.assertEqual(response.status_code, 200)
        
        # Remove admin role
        self.user.realm_roles = ['user']
        self.user.save()
        
        response = self.client.get(reverse('admin_view'))
        self.assertEqual(response.status_code, 403)
    
    @patch('mozilla_django_oidc.auth.OIDCAuthenticationBackend.verify_token')
    @patch('mozilla_django_oidc.auth.OIDCAuthenticationBackend.get_userinfo')
    def test_oidc_login_flow(self, mock_userinfo, mock_verify):
        """Test complete OIDC login flow."""
        mock_verify.return_value = True
        mock_userinfo.return_value = mock_oidc_claims()
        
        # Simulate OIDC callback
        response = self.client.get(
            reverse('oidc_authentication_callback'),
            {'code': 'test-code', 'state': 'test-state'}
        )
        
        self.assertEqual(response.status_code, 302)
        self.assertIn('home', response.url)

API Tests

# tests/test_api.py
from rest_framework.test import APITestCase
from django.urls import reverse
from .fixtures import create_test_user_with_roles

class SkycloakAPITests(APITestCase):
    def setUp(self):
        self.user = create_test_user_with_roles('apiuser', roles=['api_user'])
    
    def test_api_requires_authentication(self):
        """Test API requires authentication."""
        response = self.client.get('/api/protected/')
        self.assertEqual(response.status_code, 401)
    
    def test_api_with_valid_user(self):
        """Test API access with authenticated user."""
        self.client.force_authenticate(user=self.user)
        
        response = self.client.get('/api/protected/me/')
        self.assertEqual(response.status_code, 200)
        self.assertEqual(response.data['username'], 'apiuser')
        self.assertIn('api_user', response.data['roles'])

Production Considerations

Environment Configuration

# settings/production.py
import os
from .base import *

# Security settings
SECURE_SSL_REDIRECT = True
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
SECURE_BROWSER_XSS_FILTER = True
SECURE_CONTENT_TYPE_NOSNIFF = True

# OIDC Production settings
OIDC_USE_NONCE = True
OIDC_NONCE_SIZE = 32

# Session settings
SESSION_COOKIE_AGE = 28800  # 8 hours
SESSION_SAVE_EVERY_REQUEST = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'

# Cache configuration for JWKS
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.redis.RedisCache',
        'LOCATION': os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379/1'),
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
        },
        'KEY_PREFIX': 'skycloak_django',
        'TIMEOUT': 300,  # 5 minutes
    }
}

# Logging
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/django/skycloak.log',
            'maxBytes': 1024 * 1024 * 15,  # 15MB
            'backupCount': 10,
        },
    },
    'loggers': {
        'mozilla_django_oidc': {
            'handlers': ['file'],
            'level': 'INFO',
        },
        'skycloak': {
            'handlers': ['file'],
            'level': 'INFO',
        },
    },
}

Health Check Endpoint

# health/views.py
from django.http import JsonResponse
from django.conf import settings
import requests
from django.core.cache import cache

def health_check(request):
    """Health check endpoint for monitoring."""
    health_status = {
        'status': 'healthy',
        'checks': {}
    }
    
    # Check database
    try:
        from django.db import connection
        with connection.cursor() as cursor:
            cursor.execute("SELECT 1")
        health_status['checks']['database'] = 'ok'
    except Exception as e:
        health_status['checks']['database'] = f'error: {str(e)}'
        health_status['status'] = 'unhealthy'
    
    # Check Skycloak connectivity
    try:
        response = requests.get(
            settings.OIDC_OP_JWKS_ENDPOINT,
            timeout=5
        )
        response.raise_for_status()
        health_status['checks']['skycloak'] = 'ok'
    except Exception as e:
        health_status['checks']['skycloak'] = f'error: {str(e)}'
        health_status['status'] = 'degraded'
    
    # Check cache
    try:
        cache.set('health_check', 'ok', 30)
        if cache.get('health_check') == 'ok':
            health_status['checks']['cache'] = 'ok'
        else:
            raise Exception('Cache write/read failed')
    except Exception as e:
        health_status['checks']['cache'] = f'error: {str(e)}'
        health_status['status'] = 'degraded'
    
    status_code = 200 if health_status['status'] == 'healthy' else 503
    return JsonResponse(health_status, status=status_code)

Troubleshooting

Common Issues

  1. CSRF Token Mismatch

    • Ensure CSRF_COOKIE_SAMESITE is properly configured
    • Check that AJAX requests include CSRF token
    • Verify session cookies are being set correctly
  2. Redirect URI Mismatch

    • Double-check redirect URIs in Skycloak client configuration
    • Ensure Django’s ALLOWED_HOSTS includes all valid domains
    • Check for HTTP/HTTPS mismatches
  3. Token Expiration Issues

    • Configure OIDC_RENEW_ID_TOKEN_EXPIRY_SECONDS appropriately
    • Implement token refresh logic in middleware
    • Monitor token expiration in logs

Debug Logging

# Enable detailed logging in development
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'console': {
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',
        },
    },
    'loggers': {
        'mozilla_django_oidc': {
            'handlers': ['console'],
            'level': 'DEBUG',
        },
        'django.request': {
            'handlers': ['console'],
            'level': 'DEBUG',
        },
    },
}

Performance Monitoring

# monitoring.py
from django.core.management.base import BaseCommand
from django.contrib.sessions.models import Session
from django.utils import timezone
from datetime import timedelta

class Command(BaseCommand):
    help = 'Monitor Skycloak integration performance'
    
    def handle(self, *args, **options):
        # Check active sessions
        active_sessions = Session.objects.filter(
            expire_date__gt=timezone.now()
        ).count()
        
        # Check recent logins
        recent_logins = UserModel.objects.filter(
            last_login__gte=timezone.now() - timedelta(hours=1)
        ).count()
        
        self.stdout.write(
            f"Active sessions: {active_sessions}, "
            f"Recent logins: {recent_logins}"
        )

Next Steps