Python-Keycloak Library
This guide covers how to use the python-keycloak library for direct interaction with Keycloak’s admin REST API and OpenID Connect endpoints. Unlike web framework integrations (Django, Flask), this library is designed for backend services, user provisioning, and admin automation tasks.
Prerequisites
- Python 3.10+ (required)
- Skycloak instance with configured realm
- Admin credentials OR service account client with appropriate roles
- Basic understanding of OAuth2/OIDC concepts
Version Compatibility
| python-keycloak | Python | Keycloak |
|---|---|---|
| 7.0.x (latest) | 3.10+ | 22-26 |
| 6.0.x | 3.9+ | 21-25 |
| 5.0.x | 3.8+ | 20-24 |
Note: python-keycloak 7.0+ requires Python 3.10 or higher due to type hints and modern language features.
Installation
pip install python-keycloakThis automatically installs required dependencies:
-
PyJWT- JWT token handling -
cryptography- Cryptographic operations -
requests- HTTP client -
requests-toolbelt- Multipart encoding
Alternative installation via conda-forge:
conda install -c conda-forge python-keycloakPyJWT Troubleshooting
Common Error: “Invalid audience”
When decoding tokens with keycloak_openid.decode_token(), you may encounter:
jwt.exceptions.InvalidAudienceError: Audience doesn't match
This occurs because Keycloak doesn’t include the client_id in the aud (audience) claim by default.
Solution 1: Disable Audience Verification (Quick Fix)
For development or when you control the token source:
from keycloak import KeycloakOpenID
keycloak_openid = KeycloakOpenID(
server_url="https://your-cluster.app.skycloak.io",
realm_name="your-realm",
client_id="your-client",
client_secret_key="your-secret"
)
# Get a token
token = keycloak_openid.token("username", "password")
# Decode with audience verification disabled
options = {
"verify_aud": False, # Disable audience check
"verify_signature": True, # Keep signature verification
"verify_exp": True # Keep expiration check
}
token_info = keycloak_openid.decode_token(
token['access_token'],
options=options
)Solution 2: Add Audience Mapper (Recommended)
Configure Keycloak to include your client in the audience claim:
- Navigate to Clients → Your Client → Client Scopes tab
- Click on the dedicated scope (e.g.,
your-client-dedicated) - Click Add Mapper → By configuration → Audience
- Configure the mapper:
-
Name:
audience-mapper - Included Client Audience: Select your client
- Add to ID token: Off
- Add to access token: On
-
Name:
- Click Save
After this configuration, tokens will include your client_id in the aud claim, and standard token decoding will work:
# Now works without disabling audience verification
token_info = keycloak_openid.decode_token(token['access_token'])Quick Start: KeycloakOpenID
KeycloakOpenID handles OpenID Connect operations: authentication, token management, and user info retrieval.
Initialization
from keycloak import KeycloakOpenID
keycloak_openid = KeycloakOpenID(
server_url="https://your-cluster.app.skycloak.io",
realm_name="your-realm",
client_id="your-client",
client_secret_key="your-secret" # Optional for public clients
)Get Token (Resource Owner Password Grant)
# Get token with username/password
token = keycloak_openid.token("username", "password")
print(token['access_token'])
print(token['refresh_token'])
print(token['expires_in'])Client Credentials Grant
# For service-to-service authentication
token = keycloak_openid.token(grant_type="client_credentials")Refresh Token
# Refresh an expired access token
new_token = keycloak_openid.refresh_token(token['refresh_token'])Decode Token
# Decode and validate token
KEYCLOAK_PUBLIC_KEY = (
"-----BEGIN PUBLIC KEY-----\n"
+ keycloak_openid.public_key()
+ "\n-----END PUBLIC KEY-----"
)
token_info = keycloak_openid.decode_token(
token['access_token'],
key=KEYCLOAK_PUBLIC_KEY,
options={"verify_aud": False} # See PyJWT Troubleshooting section
)
print(token_info['sub']) # User ID
print(token_info['preferred_username'])
print(token_info['realm_access']) # Realm rolesGet User Info
# Get user info from userinfo endpoint
userinfo = keycloak_openid.userinfo(token['access_token'])
print(userinfo['email'])
print(userinfo['name'])Introspect Token
# Validate token and get metadata
introspection = keycloak_openid.introspect(token['access_token'])
if introspection['active']:
print(f"Token valid, expires: {introspection['exp']}")
else:
print("Token is invalid or expired")Logout
# Logout user (invalidate refresh token)
keycloak_openid.logout(token['refresh_token'])Quick Start: KeycloakAdmin
KeycloakAdmin provides access to the Admin REST API for user management, realm configuration, and more.
Initialization with Username/Password
from keycloak import KeycloakAdmin
keycloak_admin = KeycloakAdmin(
server_url="https://your-cluster.app.skycloak.io",
username="admin",
password="admin-password",
realm_name="master", # Admin realm
user_realm_name="your-realm" # Target realm for operations
)Initialization with Token
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
# Create connection with existing token
connection = KeycloakOpenIDConnection(
server_url="https://your-cluster.app.skycloak.io",
realm_name="your-realm",
token=existing_token # Pre-acquired admin token
)
keycloak_admin = KeycloakAdmin(connection=connection)Initialization with Service Account
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
# Use client credentials (service account must have admin roles)
connection = KeycloakOpenIDConnection(
server_url="https://your-cluster.app.skycloak.io",
realm_name="your-realm",
client_id="admin-cli",
client_secret_key="your-service-account-secret"
)
keycloak_admin = KeycloakAdmin(connection=connection)User Management
Create User
# Create a basic user
user_id = keycloak_admin.create_user({
"username": "newuser",
"email": "[email protected]",
"enabled": True,
"firstName": "New",
"lastName": "User",
"credentials": [{
"type": "password",
"value": "initial-password",
"temporary": True # Force password change on first login
}]
})
print(f"Created user with ID: {user_id}")Create User with Attributes
user_id = keycloak_admin.create_user({
"username": "enterprise-user",
"email": "[email protected]",
"enabled": True,
"firstName": "Enterprise",
"lastName": "User",
"emailVerified": True,
"attributes": {
"company": ["Acme Corp"],
"department": ["Engineering"],
"employee_id": ["EMP-12345"]
},
"groups": ["/engineering", "/full-time"],
"realmRoles": ["user", "viewer"]
})Get Users
# Get all users
users = keycloak_admin.get_users({})
# Get users with filtering
users = keycloak_admin.get_users({
"email": "[email protected]"
})
# Get users with pagination
users = keycloak_admin.get_users({
"first": 0, # Offset
"max": 100, # Limit
"search": "john" # Search by username, email, first/last name
})
# Count total users
count = keycloak_admin.users_count()
print(f"Total users: {count}")Get User by ID
user = keycloak_admin.get_user(user_id)
print(user['username'])
print(user['email'])
print(user['attributes'])Update User
# Update user properties
keycloak_admin.update_user(user_id, {
"firstName": "Updated",
"lastName": "Name",
"attributes": {
"department": ["Marketing"] # Overwrites existing
}
})Enable/Disable User
# Disable user
keycloak_admin.update_user(user_id, {"enabled": False})
# Enable user
keycloak_admin.update_user(user_id, {"enabled": True})Delete User
keycloak_admin.delete_user(user_id)Set User Password
# Set permanent password
keycloak_admin.set_user_password(
user_id=user_id,
password="new-secure-password",
temporary=False
)
# Set temporary password (forces change on next login)
keycloak_admin.set_user_password(
user_id=user_id,
password="temp-password",
temporary=True
)Send Verification Email
# Send email verification
keycloak_admin.send_verify_email(user_id)
# Send password reset email
keycloak_admin.send_update_account(
user_id=user_id,
payload=["UPDATE_PASSWORD"]
)Role Management
Get Realm Roles
# Get all realm roles
roles = keycloak_admin.get_realm_roles()
for role in roles:
print(f"{role['name']}: {role.get('description', 'No description')}")
# Get specific role
admin_role = keycloak_admin.get_realm_role("admin")Create Realm Role
keycloak_admin.create_realm_role({
"name": "api_user",
"description": "Can access API endpoints",
"composite": False
})Assign Realm Roles to User
# Get role objects
roles = keycloak_admin.get_realm_roles()
user_role = next(r for r in roles if r['name'] == 'user')
api_role = next(r for r in roles if r['name'] == 'api_user')
# Assign roles
keycloak_admin.assign_realm_roles(
user_id=user_id,
roles=[user_role, api_role]
)Get User’s Realm Roles
user_roles = keycloak_admin.get_realm_roles_of_user(user_id)
for role in user_roles:
print(role['name'])Remove Realm Roles from User
keycloak_admin.delete_realm_roles_of_user(
user_id=user_id,
roles=[api_role]
)Client Roles
# Get client ID (internal ID, not client_id)
clients = keycloak_admin.get_clients()
my_client = next(c for c in clients if c['clientId'] == 'my-app')
client_internal_id = my_client['id']
# Get client roles
client_roles = keycloak_admin.get_client_roles(client_internal_id)
# Create client role
keycloak_admin.create_client_role(
client_internal_id,
{"name": "editor", "description": "Can edit content"}
)
# Assign client role to user
editor_role = keycloak_admin.get_client_role(client_internal_id, "editor")
keycloak_admin.assign_client_role(
user_id=user_id,
client_id=client_internal_id,
roles=[editor_role]
)Token Operations
Token Generation Flows
# Password grant (user authentication)
token = keycloak_openid.token("username", "password")
# Client credentials (service authentication)
token = keycloak_openid.token(grant_type="client_credentials")
# Authorization code (web flow)
# First, redirect user to:
auth_url = keycloak_openid.auth_url(
redirect_uri="https://your-app.com/callback",
scope="openid profile email"
)
# Then exchange code for token:
token = keycloak_openid.token(
grant_type="authorization_code",
code="authorization-code-from-callback",
redirect_uri="https://your-app.com/callback"
)Token Validation with JWKS
import requests
from jose import jwt
# Get JWKS (JSON Web Key Set)
jwks_url = f"{keycloak_openid.well_known()['jwks_uri']}"
jwks = requests.get(jwks_url).json()
# Validate token with JWKS
try:
claims = jwt.decode(
token['access_token'],
jwks,
algorithms=['RS256'],
audience=keycloak_openid.client_id,
issuer=keycloak_openid.well_known()['issuer']
)
print(f"Valid token for user: {claims['sub']}")
except jwt.JWTError as e:
print(f"Invalid token: {e}")Token Refresh Pattern
import time
class TokenManager:
def __init__(self, keycloak_openid):
self.kc = keycloak_openid
self.token = None
self.token_expiry = 0
def get_token(self):
"""Get valid access token, refreshing if needed."""
current_time = time.time()
# No token yet
if self.token is None:
raise ValueError("No token available. Call authenticate() first.")
# Token expired or about to expire (30s buffer)
if current_time >= self.token_expiry - 30:
self._refresh_token()
return self.token['access_token']
def authenticate(self, username, password):
"""Initial authentication."""
self.token = self.kc.token(username, password)
self.token_expiry = time.time() + self.token['expires_in']
def _refresh_token(self):
"""Refresh the access token."""
try:
self.token = self.kc.refresh_token(self.token['refresh_token'])
self.token_expiry = time.time() + self.token['expires_in']
except Exception as e:
# Refresh token expired, need re-authentication
raise ValueError(f"Token refresh failed: {e}")Error Handling for Expired Tokens
from keycloak.exceptions import KeycloakAuthenticationError
def make_authenticated_request(token_manager, request_func):
"""Make request with automatic token refresh."""
try:
token = token_manager.get_token()
return request_func(token)
except KeycloakAuthenticationError:
# Token invalid, try refresh
token_manager._refresh_token()
token = token_manager.get_token()
return request_func(token)Advanced Features
Async Operations
python-keycloak 7.0+ supports async methods with the a_ prefix:
import asyncio
from keycloak import KeycloakOpenID
async def async_auth():
keycloak_openid = KeycloakOpenID(
server_url="https://your-cluster.app.skycloak.io",
realm_name="your-realm",
client_id="your-client",
client_secret_key="your-secret"
)
# Async token request
token = await keycloak_openid.a_token("username", "password")
# Async userinfo
userinfo = await keycloak_openid.a_userinfo(token['access_token'])
# Async introspect
introspection = await keycloak_openid.a_introspect(token['access_token'])
return userinfo
# Run async
result = asyncio.run(async_auth())PKCE Flow Support
import secrets
import hashlib
import base64
# Generate PKCE verifier and challenge
code_verifier = secrets.token_urlsafe(32)
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).rstrip(b'=').decode()
# Get auth URL with PKCE
auth_url = keycloak_openid.auth_url(
redirect_uri="https://your-app.com/callback",
scope="openid profile email",
code_challenge=code_challenge,
code_challenge_method="S256"
)
# Exchange code with verifier
token = keycloak_openid.token(
grant_type="authorization_code",
code="authorization-code",
redirect_uri="https://your-app.com/callback",
code_verifier=code_verifier
)UMA Permissions
# Get UMA permissions for a resource
permissions = keycloak_openid.uma_permissions(
token['access_token'],
permissions=["resource_id#scope"]
)
# Check if user has permission
has_permission = keycloak_openid.has_uma_access(
token['access_token'],
"resource_id#read"
)Custom Headers and Proxies
keycloak_openid = KeycloakOpenID(
server_url="https://your-cluster.app.skycloak.io",
realm_name="your-realm",
client_id="your-client",
custom_headers={
"X-Custom-Header": "value",
"User-Agent": "MyApp/1.0"
},
proxies={
"http": "http://proxy:8080",
"https": "https://proxy:8080"
},
verify=True # SSL verification (set to False for self-signed certs)
)Connection Pooling
keycloak_admin = KeycloakAdmin(
server_url="https://your-cluster.app.skycloak.io",
username="admin",
password="admin-password",
realm_name="your-realm",
timeout=60, # Request timeout in seconds
pool_maxsize=10, # Max connections in pool
pool_connections=5 # Pool connections to keep
)Production Considerations
Environment Variables for Credentials
import os
from keycloak import KeycloakOpenID, KeycloakAdmin
keycloak_openid = KeycloakOpenID(
server_url=os.environ['KEYCLOAK_SERVER_URL'],
realm_name=os.environ['KEYCLOAK_REALM'],
client_id=os.environ['KEYCLOAK_CLIENT_ID'],
client_secret_key=os.environ.get('KEYCLOAK_CLIENT_SECRET')
)
keycloak_admin = KeycloakAdmin(
server_url=os.environ['KEYCLOAK_SERVER_URL'],
username=os.environ.get('KEYCLOAK_ADMIN_USER'),
password=os.environ.get('KEYCLOAK_ADMIN_PASSWORD'),
realm_name=os.environ['KEYCLOAK_REALM']
)Timeout and Retry Configuration
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Create connection with retry logic
connection = KeycloakOpenIDConnection(
server_url="https://your-cluster.app.skycloak.io",
realm_name="your-realm",
client_id="admin-cli",
client_secret_key="secret",
timeout=30 # 30 second timeout
)
# Add retry adapter
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
connection.session.mount("https://", adapter)
connection.session.mount("http://", adapter)
keycloak_admin = KeycloakAdmin(connection=connection)SSL/TLS Certificate Verification
# Production: Always verify SSL (default)
keycloak_openid = KeycloakOpenID(
server_url="https://your-cluster.app.skycloak.io",
realm_name="your-realm",
client_id="your-client",
verify=True # Default
)
# Development with self-signed cert (NOT for production)
keycloak_openid = KeycloakOpenID(
server_url="https://localhost:8443",
realm_name="your-realm",
client_id="your-client",
verify=False # Disable SSL verification
)
# Custom CA certificate
keycloak_openid = KeycloakOpenID(
server_url="https://your-cluster.app.skycloak.io",
realm_name="your-realm",
client_id="your-client",
verify="/path/to/ca-bundle.crt"
)Error Handling Patterns
from keycloak.exceptions import (
KeycloakAuthenticationError,
KeycloakConnectionError,
KeycloakGetError,
KeycloakPostError
)
def safe_create_user(admin, user_data):
"""Create user with comprehensive error handling."""
try:
user_id = admin.create_user(user_data)
return {"success": True, "user_id": user_id}
except KeycloakAuthenticationError as e:
# Admin credentials invalid or expired
return {"success": False, "error": "Authentication failed", "details": str(e)}
except KeycloakConnectionError as e:
# Network or connectivity issues
return {"success": False, "error": "Connection failed", "details": str(e)}
except KeycloakPostError as e:
# API returned an error (e.g., user exists)
if e.response_code == 409:
return {"success": False, "error": "User already exists"}
return {"success": False, "error": "API error", "details": str(e)}
except Exception as e:
return {"success": False, "error": "Unexpected error", "details": str(e)}Testing
Mock KeycloakOpenID for Unit Tests
from unittest.mock import Mock, patch
import pytest
@pytest.fixture
def mock_keycloak_openid():
"""Create a mock KeycloakOpenID instance."""
mock = Mock()
# Mock token response
mock.token.return_value = {
'access_token': 'mock-access-token',
'refresh_token': 'mock-refresh-token',
'expires_in': 300,
'token_type': 'Bearer'
}
# Mock userinfo response
mock.userinfo.return_value = {
'sub': 'user-123',
'preferred_username': 'testuser',
'email': '[email protected]',
'email_verified': True
}
# Mock introspect response
mock.introspect.return_value = {
'active': True,
'sub': 'user-123',
'exp': 1234567890
}
return mock
def test_authentication(mock_keycloak_openid):
"""Test authentication flow."""
token = mock_keycloak_openid.token("testuser", "password")
assert token['access_token'] == 'mock-access-token'
mock_keycloak_openid.token.assert_called_once_with("testuser", "password")Test Fixtures for Token Responses
import pytest
import time
@pytest.fixture
def valid_token_response():
return {
'access_token': 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...',
'refresh_token': 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...',
'expires_in': 300,
'refresh_expires_in': 1800,
'token_type': 'Bearer',
'scope': 'openid profile email'
}
@pytest.fixture
def decoded_token():
return {
'sub': 'user-uuid-123',
'preferred_username': 'testuser',
'email': '[email protected]',
'realm_access': {
'roles': ['user', 'admin']
},
'resource_access': {
'my-app': {
'roles': ['editor']
}
},
'exp': int(time.time()) + 300,
'iat': int(time.time()),
'iss': 'https://keycloak.example.com/realms/test'
}Integration Test Pattern
import pytest
import os
@pytest.fixture(scope="module")
def keycloak_openid():
"""Real Keycloak connection for integration tests."""
from keycloak import KeycloakOpenID
return KeycloakOpenID(
server_url=os.environ.get('TEST_KEYCLOAK_URL', 'http://localhost:8080'),
realm_name=os.environ.get('TEST_KEYCLOAK_REALM', 'test'),
client_id=os.environ.get('TEST_KEYCLOAK_CLIENT', 'test-client'),
client_secret_key=os.environ.get('TEST_KEYCLOAK_SECRET')
)
@pytest.mark.integration
def test_real_authentication(keycloak_openid):
"""Integration test with real Keycloak."""
token = keycloak_openid.token(
os.environ['TEST_USER'],
os.environ['TEST_PASSWORD']
)
assert 'access_token' in token
userinfo = keycloak_openid.userinfo(token['access_token'])
assert userinfo['preferred_username'] == os.environ['TEST_USER']Common Errors & Troubleshooting
PyJWT “Invalid audience”
Error: jwt.exceptions.InvalidAudienceError: Audience doesn't match
Solution: See PyJWT Troubleshooting section above.
401 Unauthorized
Causes:
- Invalid or expired credentials
- Missing or incorrect
client_secret - Token has expired
Solutions:
# Check credentials
print(f"Client ID: {keycloak_openid.client_id}")
print(f"Realm: {keycloak_openid.realm_name}")
# Verify client secret is set for confidential clients
if not keycloak_openid.client_secret_key:
print("Warning: No client secret set")
# Check token expiration
import time
if token_info['exp'] < time.time():
print("Token has expired")403 Forbidden
Causes:
- User lacks required realm/client roles
- Service account missing admin permissions
Solutions:
# Check user roles in token
print(f"Realm roles: {token_info.get('realm_access', {}).get('roles', [])}")
print(f"Client roles: {token_info.get('resource_access', {})}")
# For admin operations, ensure service account has:
# - realm-management client roles
# - manage-users, manage-clients, etc.Connection Errors
Causes:
- Incorrect
server_url - Network/firewall issues
- SSL certificate problems
Solutions:
# Verify server URL format (no trailing slash)
server_url = "https://your-cluster.app.skycloak.io" # Correct
server_url = "https://your-cluster.app.skycloak.io/" # May cause issues
# Test connectivity
import requests
try:
response = requests.get(f"{server_url}/realms/{realm_name}/.well-known/openid-configuration")
print(f"Status: {response.status_code}")
except requests.exceptions.SSLError:
print("SSL certificate verification failed")
except requests.exceptions.ConnectionError:
print("Cannot connect to server")Token Decode Failures
Causes:
- Wrong algorithm specified
- JWKS endpoint not accessible
- Token corrupted or truncated
Solutions:
# Get well-known configuration
well_known = keycloak_openid.well_known()
print(f"Supported algorithms: {well_known.get('id_token_signing_alg_values_supported')}")
# Verify JWKS is accessible
import requests
jwks_response = requests.get(well_known['jwks_uri'])
print(f"JWKS status: {jwks_response.status_code}")
# Check token format
parts = token['access_token'].split('.')
if len(parts) != 3:
print("Token is malformed (should have 3 parts)")Next Steps
- Django Integration - Full web application authentication with Django
- User Management Features - Learn about Skycloak’s user management capabilities
- Identity Providers - Configure social and enterprise login