Python-Keycloak Library

Python-Keycloak Library

This guide covers how to use the python-keycloak library for direct interaction with Keycloak’s admin REST API and OpenID Connect endpoints. Unlike web framework integrations (Django, Flask), this library is designed for backend services, user provisioning, and admin automation tasks.

Prerequisites

  • Python 3.10+ (required)
  • Skycloak instance with configured realm
  • Admin credentials OR service account client with appropriate roles
  • Basic understanding of OAuth2/OIDC concepts
ℹ️

Version Compatibility

python-keycloak Python Keycloak
7.0.x (latest) 3.10+ 22-26
6.0.x 3.9+ 21-25
5.0.x 3.8+ 20-24

Note: python-keycloak 7.0+ requires Python 3.10 or higher due to type hints and modern language features.

Installation

pip install python-keycloak

This automatically installs required dependencies:

  • PyJWT - JWT token handling
  • cryptography - Cryptographic operations
  • requests - HTTP client
  • requests-toolbelt - Multipart encoding

Alternative installation via conda-forge:

conda install -c conda-forge python-keycloak

PyJWT Troubleshooting

⚠️

Common Error: “Invalid audience”

When decoding tokens with keycloak_openid.decode_token(), you may encounter:

jwt.exceptions.InvalidAudienceError: Audience doesn't match

This occurs because Keycloak doesn’t include the client_id in the aud (audience) claim by default.

Solution 1: Disable Audience Verification (Quick Fix)

For development or when you control the token source:

from keycloak import KeycloakOpenID

keycloak_openid = KeycloakOpenID(
    server_url="https://your-cluster.app.skycloak.io",
    realm_name="your-realm",
    client_id="your-client",
    client_secret_key="your-secret"
)

# Get a token
token = keycloak_openid.token("username", "password")

# Decode with audience verification disabled
options = {
    "verify_aud": False,      # Disable audience check
    "verify_signature": True,  # Keep signature verification
    "verify_exp": True         # Keep expiration check
}

token_info = keycloak_openid.decode_token(
    token['access_token'],
    options=options
)

Solution 2: Add Audience Mapper (Recommended)

Configure Keycloak to include your client in the audience claim:

  1. Navigate to ClientsYour ClientClient Scopes tab
  2. Click on the dedicated scope (e.g., your-client-dedicated)
  3. Click Add MapperBy configurationAudience
  4. Configure the mapper:
    • Name: audience-mapper
    • Included Client Audience: Select your client
    • Add to ID token: Off
    • Add to access token: On
  5. Click Save

After this configuration, tokens will include your client_id in the aud claim, and standard token decoding will work:

# Now works without disabling audience verification
token_info = keycloak_openid.decode_token(token['access_token'])

Quick Start: KeycloakOpenID

KeycloakOpenID handles OpenID Connect operations: authentication, token management, and user info retrieval.

Initialization

from keycloak import KeycloakOpenID

keycloak_openid = KeycloakOpenID(
    server_url="https://your-cluster.app.skycloak.io",
    realm_name="your-realm",
    client_id="your-client",
    client_secret_key="your-secret"  # Optional for public clients
)

Get Token (Resource Owner Password Grant)

# Get token with username/password
token = keycloak_openid.token("username", "password")

print(token['access_token'])
print(token['refresh_token'])
print(token['expires_in'])

Client Credentials Grant

# For service-to-service authentication
token = keycloak_openid.token(grant_type="client_credentials")

Refresh Token

# Refresh an expired access token
new_token = keycloak_openid.refresh_token(token['refresh_token'])

Decode Token

# Decode and validate token
KEYCLOAK_PUBLIC_KEY = (
    "-----BEGIN PUBLIC KEY-----\n"
    + keycloak_openid.public_key()
    + "\n-----END PUBLIC KEY-----"
)

token_info = keycloak_openid.decode_token(
    token['access_token'],
    key=KEYCLOAK_PUBLIC_KEY,
    options={"verify_aud": False}  # See PyJWT Troubleshooting section
)

print(token_info['sub'])           # User ID
print(token_info['preferred_username'])
print(token_info['realm_access'])  # Realm roles

Get User Info

# Get user info from userinfo endpoint
userinfo = keycloak_openid.userinfo(token['access_token'])

print(userinfo['email'])
print(userinfo['name'])

Introspect Token

# Validate token and get metadata
introspection = keycloak_openid.introspect(token['access_token'])

if introspection['active']:
    print(f"Token valid, expires: {introspection['exp']}")
else:
    print("Token is invalid or expired")

Logout

# Logout user (invalidate refresh token)
keycloak_openid.logout(token['refresh_token'])

Quick Start: KeycloakAdmin

KeycloakAdmin provides access to the Admin REST API for user management, realm configuration, and more.

Initialization with Username/Password

from keycloak import KeycloakAdmin

keycloak_admin = KeycloakAdmin(
    server_url="https://your-cluster.app.skycloak.io",
    username="admin",
    password="admin-password",
    realm_name="master",           # Admin realm
    user_realm_name="your-realm"   # Target realm for operations
)

Initialization with Token

from keycloak import KeycloakAdmin, KeycloakOpenIDConnection

# Create connection with existing token
connection = KeycloakOpenIDConnection(
    server_url="https://your-cluster.app.skycloak.io",
    realm_name="your-realm",
    token=existing_token  # Pre-acquired admin token
)

keycloak_admin = KeycloakAdmin(connection=connection)

Initialization with Service Account

from keycloak import KeycloakAdmin, KeycloakOpenIDConnection

# Use client credentials (service account must have admin roles)
connection = KeycloakOpenIDConnection(
    server_url="https://your-cluster.app.skycloak.io",
    realm_name="your-realm",
    client_id="admin-cli",
    client_secret_key="your-service-account-secret"
)

keycloak_admin = KeycloakAdmin(connection=connection)

User Management

Create User

# Create a basic user
user_id = keycloak_admin.create_user({
    "username": "newuser",
    "email": "[email protected]",
    "enabled": True,
    "firstName": "New",
    "lastName": "User",
    "credentials": [{
        "type": "password",
        "value": "initial-password",
        "temporary": True  # Force password change on first login
    }]
})

print(f"Created user with ID: {user_id}")

Create User with Attributes

user_id = keycloak_admin.create_user({
    "username": "enterprise-user",
    "email": "[email protected]",
    "enabled": True,
    "firstName": "Enterprise",
    "lastName": "User",
    "emailVerified": True,
    "attributes": {
        "company": ["Acme Corp"],
        "department": ["Engineering"],
        "employee_id": ["EMP-12345"]
    },
    "groups": ["/engineering", "/full-time"],
    "realmRoles": ["user", "viewer"]
})

Get Users

# Get all users
users = keycloak_admin.get_users({})

# Get users with filtering
users = keycloak_admin.get_users({
    "email": "[email protected]"
})

# Get users with pagination
users = keycloak_admin.get_users({
    "first": 0,      # Offset
    "max": 100,       # Limit
    "search": "john"  # Search by username, email, first/last name
})

# Count total users
count = keycloak_admin.users_count()
print(f"Total users: {count}")

Get User by ID

user = keycloak_admin.get_user(user_id)

print(user['username'])
print(user['email'])
print(user['attributes'])

Update User

# Update user properties
keycloak_admin.update_user(user_id, {
    "firstName": "Updated",
    "lastName": "Name",
    "attributes": {
        "department": ["Marketing"]  # Overwrites existing
    }
})

Enable/Disable User

# Disable user
keycloak_admin.update_user(user_id, {"enabled": False})

# Enable user
keycloak_admin.update_user(user_id, {"enabled": True})

Delete User

keycloak_admin.delete_user(user_id)

Set User Password

# Set permanent password
keycloak_admin.set_user_password(
    user_id=user_id,
    password="new-secure-password",
    temporary=False
)

# Set temporary password (forces change on next login)
keycloak_admin.set_user_password(
    user_id=user_id,
    password="temp-password",
    temporary=True
)

Send Verification Email

# Send email verification
keycloak_admin.send_verify_email(user_id)

# Send password reset email
keycloak_admin.send_update_account(
    user_id=user_id,
    payload=["UPDATE_PASSWORD"]
)

Role Management

Get Realm Roles

# Get all realm roles
roles = keycloak_admin.get_realm_roles()

for role in roles:
    print(f"{role['name']}: {role.get('description', 'No description')}")

# Get specific role
admin_role = keycloak_admin.get_realm_role("admin")

Create Realm Role

keycloak_admin.create_realm_role({
    "name": "api_user",
    "description": "Can access API endpoints",
    "composite": False
})

Assign Realm Roles to User

# Get role objects
roles = keycloak_admin.get_realm_roles()
user_role = next(r for r in roles if r['name'] == 'user')
api_role = next(r for r in roles if r['name'] == 'api_user')

# Assign roles
keycloak_admin.assign_realm_roles(
    user_id=user_id,
    roles=[user_role, api_role]
)

Get User’s Realm Roles

user_roles = keycloak_admin.get_realm_roles_of_user(user_id)

for role in user_roles:
    print(role['name'])

Remove Realm Roles from User

keycloak_admin.delete_realm_roles_of_user(
    user_id=user_id,
    roles=[api_role]
)

Client Roles

# Get client ID (internal ID, not client_id)
clients = keycloak_admin.get_clients()
my_client = next(c for c in clients if c['clientId'] == 'my-app')
client_internal_id = my_client['id']

# Get client roles
client_roles = keycloak_admin.get_client_roles(client_internal_id)

# Create client role
keycloak_admin.create_client_role(
    client_internal_id,
    {"name": "editor", "description": "Can edit content"}
)

# Assign client role to user
editor_role = keycloak_admin.get_client_role(client_internal_id, "editor")
keycloak_admin.assign_client_role(
    user_id=user_id,
    client_id=client_internal_id,
    roles=[editor_role]
)

Token Operations

Token Generation Flows

# Password grant (user authentication)
token = keycloak_openid.token("username", "password")

# Client credentials (service authentication)
token = keycloak_openid.token(grant_type="client_credentials")

# Authorization code (web flow)
# First, redirect user to:
auth_url = keycloak_openid.auth_url(
    redirect_uri="https://your-app.com/callback",
    scope="openid profile email"
)

# Then exchange code for token:
token = keycloak_openid.token(
    grant_type="authorization_code",
    code="authorization-code-from-callback",
    redirect_uri="https://your-app.com/callback"
)

Token Validation with JWKS

import requests
from jose import jwt

# Get JWKS (JSON Web Key Set)
jwks_url = f"{keycloak_openid.well_known()['jwks_uri']}"
jwks = requests.get(jwks_url).json()

# Validate token with JWKS
try:
    claims = jwt.decode(
        token['access_token'],
        jwks,
        algorithms=['RS256'],
        audience=keycloak_openid.client_id,
        issuer=keycloak_openid.well_known()['issuer']
    )
    print(f"Valid token for user: {claims['sub']}")
except jwt.JWTError as e:
    print(f"Invalid token: {e}")

Token Refresh Pattern

import time

class TokenManager:
    def __init__(self, keycloak_openid):
        self.kc = keycloak_openid
        self.token = None
        self.token_expiry = 0

    def get_token(self):
        """Get valid access token, refreshing if needed."""
        current_time = time.time()

        # No token yet
        if self.token is None:
            raise ValueError("No token available. Call authenticate() first.")

        # Token expired or about to expire (30s buffer)
        if current_time >= self.token_expiry - 30:
            self._refresh_token()

        return self.token['access_token']

    def authenticate(self, username, password):
        """Initial authentication."""
        self.token = self.kc.token(username, password)
        self.token_expiry = time.time() + self.token['expires_in']

    def _refresh_token(self):
        """Refresh the access token."""
        try:
            self.token = self.kc.refresh_token(self.token['refresh_token'])
            self.token_expiry = time.time() + self.token['expires_in']
        except Exception as e:
            # Refresh token expired, need re-authentication
            raise ValueError(f"Token refresh failed: {e}")

Error Handling for Expired Tokens

from keycloak.exceptions import KeycloakAuthenticationError

def make_authenticated_request(token_manager, request_func):
    """Make request with automatic token refresh."""
    try:
        token = token_manager.get_token()
        return request_func(token)
    except KeycloakAuthenticationError:
        # Token invalid, try refresh
        token_manager._refresh_token()
        token = token_manager.get_token()
        return request_func(token)

Advanced Features

Async Operations

python-keycloak 7.0+ supports async methods with the a_ prefix:

import asyncio
from keycloak import KeycloakOpenID

async def async_auth():
    keycloak_openid = KeycloakOpenID(
        server_url="https://your-cluster.app.skycloak.io",
        realm_name="your-realm",
        client_id="your-client",
        client_secret_key="your-secret"
    )

    # Async token request
    token = await keycloak_openid.a_token("username", "password")

    # Async userinfo
    userinfo = await keycloak_openid.a_userinfo(token['access_token'])

    # Async introspect
    introspection = await keycloak_openid.a_introspect(token['access_token'])

    return userinfo

# Run async
result = asyncio.run(async_auth())

PKCE Flow Support

import secrets
import hashlib
import base64

# Generate PKCE verifier and challenge
code_verifier = secrets.token_urlsafe(32)
code_challenge = base64.urlsafe_b64encode(
    hashlib.sha256(code_verifier.encode()).digest()
).rstrip(b'=').decode()

# Get auth URL with PKCE
auth_url = keycloak_openid.auth_url(
    redirect_uri="https://your-app.com/callback",
    scope="openid profile email",
    code_challenge=code_challenge,
    code_challenge_method="S256"
)

# Exchange code with verifier
token = keycloak_openid.token(
    grant_type="authorization_code",
    code="authorization-code",
    redirect_uri="https://your-app.com/callback",
    code_verifier=code_verifier
)

UMA Permissions

# Get UMA permissions for a resource
permissions = keycloak_openid.uma_permissions(
    token['access_token'],
    permissions=["resource_id#scope"]
)

# Check if user has permission
has_permission = keycloak_openid.has_uma_access(
    token['access_token'],
    "resource_id#read"
)

Custom Headers and Proxies

keycloak_openid = KeycloakOpenID(
    server_url="https://your-cluster.app.skycloak.io",
    realm_name="your-realm",
    client_id="your-client",
    custom_headers={
        "X-Custom-Header": "value",
        "User-Agent": "MyApp/1.0"
    },
    proxies={
        "http": "http://proxy:8080",
        "https": "https://proxy:8080"
    },
    verify=True  # SSL verification (set to False for self-signed certs)
)

Connection Pooling

keycloak_admin = KeycloakAdmin(
    server_url="https://your-cluster.app.skycloak.io",
    username="admin",
    password="admin-password",
    realm_name="your-realm",
    timeout=60,           # Request timeout in seconds
    pool_maxsize=10,      # Max connections in pool
    pool_connections=5    # Pool connections to keep
)

Production Considerations

Environment Variables for Credentials

import os
from keycloak import KeycloakOpenID, KeycloakAdmin

keycloak_openid = KeycloakOpenID(
    server_url=os.environ['KEYCLOAK_SERVER_URL'],
    realm_name=os.environ['KEYCLOAK_REALM'],
    client_id=os.environ['KEYCLOAK_CLIENT_ID'],
    client_secret_key=os.environ.get('KEYCLOAK_CLIENT_SECRET')
)

keycloak_admin = KeycloakAdmin(
    server_url=os.environ['KEYCLOAK_SERVER_URL'],
    username=os.environ.get('KEYCLOAK_ADMIN_USER'),
    password=os.environ.get('KEYCLOAK_ADMIN_PASSWORD'),
    realm_name=os.environ['KEYCLOAK_REALM']
)

Timeout and Retry Configuration

from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Create connection with retry logic
connection = KeycloakOpenIDConnection(
    server_url="https://your-cluster.app.skycloak.io",
    realm_name="your-realm",
    client_id="admin-cli",
    client_secret_key="secret",
    timeout=30  # 30 second timeout
)

# Add retry adapter
retry_strategy = Retry(
    total=3,
    backoff_factor=1,
    status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
connection.session.mount("https://", adapter)
connection.session.mount("http://", adapter)

keycloak_admin = KeycloakAdmin(connection=connection)

SSL/TLS Certificate Verification

# Production: Always verify SSL (default)
keycloak_openid = KeycloakOpenID(
    server_url="https://your-cluster.app.skycloak.io",
    realm_name="your-realm",
    client_id="your-client",
    verify=True  # Default
)

# Development with self-signed cert (NOT for production)
keycloak_openid = KeycloakOpenID(
    server_url="https://localhost:8443",
    realm_name="your-realm",
    client_id="your-client",
    verify=False  # Disable SSL verification
)

# Custom CA certificate
keycloak_openid = KeycloakOpenID(
    server_url="https://your-cluster.app.skycloak.io",
    realm_name="your-realm",
    client_id="your-client",
    verify="/path/to/ca-bundle.crt"
)

Error Handling Patterns

from keycloak.exceptions import (
    KeycloakAuthenticationError,
    KeycloakConnectionError,
    KeycloakGetError,
    KeycloakPostError
)

def safe_create_user(admin, user_data):
    """Create user with comprehensive error handling."""
    try:
        user_id = admin.create_user(user_data)
        return {"success": True, "user_id": user_id}

    except KeycloakAuthenticationError as e:
        # Admin credentials invalid or expired
        return {"success": False, "error": "Authentication failed", "details": str(e)}

    except KeycloakConnectionError as e:
        # Network or connectivity issues
        return {"success": False, "error": "Connection failed", "details": str(e)}

    except KeycloakPostError as e:
        # API returned an error (e.g., user exists)
        if e.response_code == 409:
            return {"success": False, "error": "User already exists"}
        return {"success": False, "error": "API error", "details": str(e)}

    except Exception as e:
        return {"success": False, "error": "Unexpected error", "details": str(e)}

Testing

Mock KeycloakOpenID for Unit Tests

from unittest.mock import Mock, patch
import pytest

@pytest.fixture
def mock_keycloak_openid():
    """Create a mock KeycloakOpenID instance."""
    mock = Mock()

    # Mock token response
    mock.token.return_value = {
        'access_token': 'mock-access-token',
        'refresh_token': 'mock-refresh-token',
        'expires_in': 300,
        'token_type': 'Bearer'
    }

    # Mock userinfo response
    mock.userinfo.return_value = {
        'sub': 'user-123',
        'preferred_username': 'testuser',
        'email': '[email protected]',
        'email_verified': True
    }

    # Mock introspect response
    mock.introspect.return_value = {
        'active': True,
        'sub': 'user-123',
        'exp': 1234567890
    }

    return mock

def test_authentication(mock_keycloak_openid):
    """Test authentication flow."""
    token = mock_keycloak_openid.token("testuser", "password")

    assert token['access_token'] == 'mock-access-token'
    mock_keycloak_openid.token.assert_called_once_with("testuser", "password")

Test Fixtures for Token Responses

import pytest
import time

@pytest.fixture
def valid_token_response():
    return {
        'access_token': 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...',
        'refresh_token': 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...',
        'expires_in': 300,
        'refresh_expires_in': 1800,
        'token_type': 'Bearer',
        'scope': 'openid profile email'
    }

@pytest.fixture
def decoded_token():
    return {
        'sub': 'user-uuid-123',
        'preferred_username': 'testuser',
        'email': '[email protected]',
        'realm_access': {
            'roles': ['user', 'admin']
        },
        'resource_access': {
            'my-app': {
                'roles': ['editor']
            }
        },
        'exp': int(time.time()) + 300,
        'iat': int(time.time()),
        'iss': 'https://keycloak.example.com/realms/test'
    }

Integration Test Pattern

import pytest
import os

@pytest.fixture(scope="module")
def keycloak_openid():
    """Real Keycloak connection for integration tests."""
    from keycloak import KeycloakOpenID

    return KeycloakOpenID(
        server_url=os.environ.get('TEST_KEYCLOAK_URL', 'http://localhost:8080'),
        realm_name=os.environ.get('TEST_KEYCLOAK_REALM', 'test'),
        client_id=os.environ.get('TEST_KEYCLOAK_CLIENT', 'test-client'),
        client_secret_key=os.environ.get('TEST_KEYCLOAK_SECRET')
    )

@pytest.mark.integration
def test_real_authentication(keycloak_openid):
    """Integration test with real Keycloak."""
    token = keycloak_openid.token(
        os.environ['TEST_USER'],
        os.environ['TEST_PASSWORD']
    )

    assert 'access_token' in token

    userinfo = keycloak_openid.userinfo(token['access_token'])
    assert userinfo['preferred_username'] == os.environ['TEST_USER']

Common Errors & Troubleshooting

PyJWT “Invalid audience”

Error: jwt.exceptions.InvalidAudienceError: Audience doesn't match

Solution: See PyJWT Troubleshooting section above.

401 Unauthorized

Causes:

  • Invalid or expired credentials
  • Missing or incorrect client_secret
  • Token has expired

Solutions:

# Check credentials
print(f"Client ID: {keycloak_openid.client_id}")
print(f"Realm: {keycloak_openid.realm_name}")

# Verify client secret is set for confidential clients
if not keycloak_openid.client_secret_key:
    print("Warning: No client secret set")

# Check token expiration
import time
if token_info['exp'] < time.time():
    print("Token has expired")

403 Forbidden

Causes:

  • User lacks required realm/client roles
  • Service account missing admin permissions

Solutions:

# Check user roles in token
print(f"Realm roles: {token_info.get('realm_access', {}).get('roles', [])}")
print(f"Client roles: {token_info.get('resource_access', {})}")

# For admin operations, ensure service account has:
# - realm-management client roles
# - manage-users, manage-clients, etc.

Connection Errors

Causes:

  • Incorrect server_url
  • Network/firewall issues
  • SSL certificate problems

Solutions:

# Verify server URL format (no trailing slash)
server_url = "https://your-cluster.app.skycloak.io"  # Correct
server_url = "https://your-cluster.app.skycloak.io/"  # May cause issues

# Test connectivity
import requests
try:
    response = requests.get(f"{server_url}/realms/{realm_name}/.well-known/openid-configuration")
    print(f"Status: {response.status_code}")
except requests.exceptions.SSLError:
    print("SSL certificate verification failed")
except requests.exceptions.ConnectionError:
    print("Cannot connect to server")

Token Decode Failures

Causes:

  • Wrong algorithm specified
  • JWKS endpoint not accessible
  • Token corrupted or truncated

Solutions:

# Get well-known configuration
well_known = keycloak_openid.well_known()
print(f"Supported algorithms: {well_known.get('id_token_signing_alg_values_supported')}")

# Verify JWKS is accessible
import requests
jwks_response = requests.get(well_known['jwks_uri'])
print(f"JWKS status: {jwks_response.status_code}")

# Check token format
parts = token['access_token'].split('.')
if len(parts) != 3:
    print("Token is malformed (should have 3 parts)")

Next Steps