JWT Token Validation Best Practices
JWT Token Validation Best Practices
JWT (JSON Web Token) validation is critical for securing your applications. This tutorial shows you exactly how to validate Keycloak JWTs properly, avoid common security pitfalls, and implement best practices that scale.
Understanding Keycloak JWTs
JWT Structure
Keycloak issues JWTs with three parts:
eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6IjEyMzQ1Njc4OTAifQ.eyJleHAiOjE2NDI1MjE2MDAsImlhdCI6MTY0MjUxODAwMCwianRpIjoiYWJjZGVmZ2giLCJpc3MiOiJodHRwczovL2tleWNsb2FrLmV4YW1wbGUuY29tL2F1dGgvcmVhbG1zL215cmVhbG0iLCJhdWQiOlsibXktYXBwIiwiYWNjb3VudCJdLCJzdWIiOiIxMjM0NTY3OC05MGFiLWNkZWYtMTIzNC01Njc4OTBhYmNkZWYiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJteS1hcHAiLCJzZXNzaW9uX3N0YXRlIjoiMTIzNDU2NzgtOTBhYi1jZGVmLTEyMzQtNTY3ODkwYWJjZGVmIiwiYWNyIjoiMSIsInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJ1c2VyIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsibXktYXBwIjp7InJvbGVzIjpbImFkbWluIl19fSwic2NvcGUiOiJvcGVuaWQgZW1haWwgcHJvZmlsZSIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJuYW1lIjoiSm9obiBEb2UiLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huZG9lIiwiZ2l2ZW5fbmFtZSI6IkpvaG4iLCJmYW1pbHlfbmFtZSI6IkRvZSIsImVtYWlsIjoiam9obkBleGFtcGxlLmNvbSJ9.signature
Decoded payload:
{
"exp": 1642521600,
"iat": 1642518000,
"jti": "abcdefgh",
"iss": "https://keycloak.example.com/realms/myrealm",
"aud": ["my-app", "account"],
"sub": "12345678-90ab-cdef-1234-567890abcdef",
"typ": "Bearer",
"azp": "my-app",
"session_state": "12345678-90ab-cdef-1234-567890abcdef",
"acr": "1",
"realm_access": {
"roles": ["user"]
},
"resource_access": {
"my-app": {
"roles": ["admin"]
}
},
"scope": "openid email profile",
"email_verified": true,
"name": "John Doe",
"preferred_username": "johndoe",
"given_name": "John",
"family_name": "Doe",
"email": "[email protected]"
}The Five Pillars of JWT Validation
1. Signature Verification
Always verify the signature first!
2. Issuer Validation
Ensure the token comes from your Keycloak instance.
3. Audience Validation
Verify the token is intended for your application.
4. Expiration Check
Never accept expired tokens.
5. Additional Claims
Validate any business-critical claims.
Implementation Examples
Node.js/Express
Complete validation middleware:
const jwksClient = require('jwks-rsa');
const jwt = require('jsonwebtoken');
// JWKS client for key rotation
const client = jwksClient({
jwksUri: `${process.env.KEYCLOAK_URL}/realms/${process.env.REALM}/protocol/openid-connect/certs`,
cache: true,
cacheMaxEntries: 5,
cacheMaxAge: 600000, // 10 minutes
rateLimit: true,
jwksRequestsPerMinute: 10
});
// Get signing key
function getKey(header, callback) {
client.getSigningKey(header.kid, (err, key) => {
if (err) {
return callback(err);
}
const signingKey = key.publicKey || key.rsaPublicKey;
callback(null, signingKey);
});
}
// Validation middleware
const validateToken = (req, res, next) => {
const token = extractToken(req);
if (!token) {
return res.status(401).json({ error: 'No token provided' });
}
// Verify token
jwt.verify(token, getKey, {
// Essential security options
algorithms: ['RS256'], // Only allow RS256
issuer: `${process.env.KEYCLOAK_URL}/realms/${process.env.REALM}`,
audience: process.env.CLIENT_ID,
clockTolerance: 30 // 30 seconds clock skew tolerance
}, (err, decoded) => {
if (err) {
console.error('Token validation error:', err.message);
return res.status(401).json({ error: 'Invalid token' });
}
// Additional validation
if (!validateAdditionalClaims(decoded)) {
return res.status(403).json({ error: 'Insufficient permissions' });
}
// Attach user info to request
req.user = decoded;
next();
});
};
// Extract token from various sources
function extractToken(req) {
// 1. Authorization header (preferred)
const authHeader = req.headers.authorization;
if (authHeader && authHeader.startsWith('Bearer ')) {
return authHeader.substring(7);
}
// 2. Cookie (for web apps)
if (req.cookies && req.cookies.access_token) {
return req.cookies.access_token;
}
// 3. Query parameter (use with caution!)
if (req.query.access_token) {
console.warn('Token in query parameter - security risk!');
return req.query.access_token;
}
return null;
}
// Additional business logic validation
function validateAdditionalClaims(decoded) {
// Check if email is verified
if (!decoded.email_verified) {
return false;
}
// Check required roles
const requiredRole = 'user';
const realmRoles = decoded.realm_access?.roles || [];
if (!realmRoles.includes(requiredRole)) {
return false;
}
// Check custom claims
if (decoded.organization && !ALLOWED_ORGS.includes(decoded.organization)) {
return false;
}
return true;
}
// Use the middleware
app.use('/api/protected', validateToken);Python/FastAPI
Async validation with caching:
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
import httpx
from jose import jwt, JWTError
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from cachetools import TTLCache
import asyncio
app = FastAPI()
security = HTTPBearer()
# Configuration
KEYCLOAK_URL = "https://keycloak.example.com"
REALM = "myrealm"
CLIENT_ID = "my-app"
JWKS_URL = f"{KEYCLOAK_URL}/realms/{REALM}/protocol/openid-connect/certs"
# Cache for JWKS keys
jwks_cache = TTLCache(maxsize=5, ttl=600) # 10 minutes
jwks_lock = asyncio.Lock()
class TokenValidator:
def __init__(self):
self.http_client = httpx.AsyncClient()
async def get_signing_key(self, token: str) -> Dict[str, Any]:
"""Get the signing key for token verification."""
# Parse header without verification
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get("kid")
if not kid:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token missing key ID"
)
# Check cache first
if kid in jwks_cache:
return jwks_cache[kid]
# Fetch JWKS with lock to prevent thundering herd
async with jwks_lock:
# Double-check cache after acquiring lock
if kid in jwks_cache:
return jwks_cache[kid]
try:
response = await self.http_client.get(JWKS_URL)
response.raise_for_status()
jwks = response.json()
# Find the key with matching kid
for key in jwks["keys"]:
if key["kid"] == kid:
jwks_cache[kid] = key
return key
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Signing key not found"
)
except httpx.HTTPError as e:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Failed to fetch JWKS: {str(e)}"
)
async def validate_token(self, credentials: HTTPAuthorizationCredentials) -> Dict[str, Any]:
"""Validate JWT token with all security checks."""
token = credentials.credentials
try:
# Get signing key
signing_key = await self.get_signing_key(token)
# Verify token
payload = jwt.decode(
token,
signing_key,
algorithms=["RS256"],
issuer=f"{KEYCLOAK_URL}/realms/{REALM}",
audience=CLIENT_ID,
options={
"verify_signature": True,
"verify_exp": True,
"verify_nbf": True,
"verify_iat": True,
"verify_aud": True,
"verify_iss": True,
"require_exp": True,
"require_iat": True,
"require_aud": True,
}
)
# Additional validation
self._validate_additional_claims(payload)
return payload
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token validation failed: {str(e)}"
)
def _validate_additional_claims(self, payload: Dict[str, Any]):
"""Validate business-specific claims."""
# Check email verification
if not payload.get("email_verified", False):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email not verified"
)
# Check token type
if payload.get("typ") != "Bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type"
)
# Check required scopes
required_scopes = {"openid", "profile"}
token_scopes = set(payload.get("scope", "").split())
if not required_scopes.issubset(token_scopes):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient scopes"
)
# Create validator instance
validator = TokenValidator()
# Dependency for protected routes
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> Dict[str, Any]:
"""Get current user from valid token."""
return await validator.validate_token(credentials)
# Protected endpoint example
@app.get("/api/protected")
async def protected_route(current_user: Dict[str, Any] = Depends(get_current_user)):
return {
"message": "Access granted",
"user": current_user["preferred_username"],
"email": current_user["email"]
}
# Role-based protection
def require_role(role: str):
"""Create a dependency that requires a specific role."""
async def role_checker(current_user: Dict[str, Any] = Depends(get_current_user)):
realm_roles = current_user.get("realm_access", {}).get("roles", [])
if role not in realm_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role '{role}' required"
)
return current_user
return role_checker
@app.get("/api/admin")
async def admin_only(current_user: Dict[str, Any] = Depends(require_role("admin"))):
return {"message": "Admin access granted"}Java/Spring Boot
Production-ready configuration:
@Configuration
@EnableWebSecurity
@EnableGlobalMethodSecurity(prePostEnabled = true)
public class SecurityConfig {
@Value("${keycloak.jwk-set-uri}")
private String jwkSetUri;
@Value("${keycloak.issuer}")
private String issuer;
@Value("${keycloak.audience}")
private String audience;
@Bean
public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
http
.cors().and()
.csrf().disable()
.sessionManagement()
.sessionCreationPolicy(SessionCreationPolicy.STATELESS)
.and()
.authorizeRequests()
.antMatchers("/api/public/**").permitAll()
.antMatchers("/api/admin/**").hasRole("ADMIN")
.anyRequest().authenticated()
.and()
.oauth2ResourceServer()
.jwt()
.jwtAuthenticationConverter(jwtAuthenticationConverter());
return http.build();
}
@Bean
public JwtDecoder jwtDecoder() {
NimbusJwtDecoder decoder = NimbusJwtDecoder
.withJwkSetUri(jwkSetUri)
.cache(Duration.ofMinutes(5))
.build();
// Add custom validators
decoder.setJwtValidator(jwtValidator());
return decoder;
}
@Bean
public OAuth2TokenValidator<Jwt> jwtValidator() {
List<OAuth2TokenValidator<Jwt>> validators = new ArrayList<>();
// Standard validators
validators.add(new JwtTimestampValidator());
validators.add(new JwtIssuerValidator(issuer));
// Custom validators
validators.add(new JwtAudienceValidator(audience));
validators.add(new EmailVerifiedValidator());
validators.add(new CustomClaimsValidator());
return new DelegatingOAuth2TokenValidator<>(validators);
}
@Bean
public JwtAuthenticationConverter jwtAuthenticationConverter() {
JwtAuthenticationConverter converter = new JwtAuthenticationConverter();
converter.setJwtGrantedAuthoritiesConverter(jwt -> {
// Extract roles from Keycloak token
Collection<GrantedAuthority> authorities = new ArrayList<>();
// Realm roles
Map<String, Object> realmAccess = jwt.getClaimAsMap("realm_access");
if (realmAccess != null && realmAccess.containsKey("roles")) {
List<String> roles = (List<String>) realmAccess.get("roles");
roles.stream()
.map(role -> new SimpleGrantedAuthority("ROLE_" + role.toUpperCase()))
.forEach(authorities::add);
}
// Client roles
Map<String, Object> resourceAccess = jwt.getClaimAsMap("resource_access");
if (resourceAccess != null && resourceAccess.containsKey(audience)) {
Map<String, Object> clientAccess = (Map<String, Object>) resourceAccess.get(audience);
if (clientAccess.containsKey("roles")) {
List<String> clientRoles = (List<String>) clientAccess.get("roles");
clientRoles.stream()
.map(role -> new SimpleGrantedAuthority("ROLE_CLIENT_" + role.toUpperCase()))
.forEach(authorities::add);
}
}
return authorities;
});
return converter;
}
}
// Custom validators
@Component
public class JwtAudienceValidator implements OAuth2TokenValidator<Jwt> {
private final String expectedAudience;
public JwtAudienceValidator(String expectedAudience) {
this.expectedAudience = expectedAudience;
}
@Override
public OAuth2TokenValidatorResult validate(Jwt jwt) {
List<String> audiences = jwt.getAudience();
if (audiences != null && audiences.contains(expectedAudience)) {
return OAuth2TokenValidatorResult.success();
}
return OAuth2TokenValidatorResult.failure(new OAuth2Error(
"invalid_audience",
"The required audience is missing",
null
));
}
}
@Component
public class EmailVerifiedValidator implements OAuth2TokenValidator<Jwt> {
@Override
public OAuth2TokenValidatorResult validate(Jwt jwt) {
Boolean emailVerified = jwt.getClaim("email_verified");
if (Boolean.TRUE.equals(emailVerified)) {
return OAuth2TokenValidatorResult.success();
}
return OAuth2TokenValidatorResult.failure(new OAuth2Error(
"email_not_verified",
"Email verification required",
null
));
}
}Common Security Mistakes
1. ❌ Not Validating Signature
Bad:
// NEVER DO THIS!
const decoded = jwt.decode(token); // No signature verification!
if (decoded.exp > Date.now() / 1000) {
// "Validated"
}Good:
jwt.verify(token, publicKey, { algorithms: ['RS256'] });2. ❌ Accepting Any Algorithm
Bad:
// Vulnerable to algorithm confusion attack
jwt.verify(token, publicKey); // No algorithm specified!
Good:
jwt.verify(token, publicKey, { algorithms: ['RS256'] });3. ❌ Not Checking Audience
Bad:
// Token might be for different application!
jwt.verify(token, publicKey, {
issuer: keycloakIssuer
// Missing audience check
});Good:
jwt.verify(token, publicKey, {
issuer: keycloakIssuer,
audience: 'my-app'
});4. ❌ Storing Tokens Insecurely
Bad:
// Vulnerable to XSS
localStorage.setItem('token', accessToken);Good:
// HttpOnly cookie (set by server)
res.cookie('access_token', token, {
httpOnly: true,
secure: true,
sameSite: 'strict',
maxAge: 15 * 60 * 1000 // 15 minutes
});Performance Best Practices
1. Cache JWKS Keys
const NodeCache = require('node-cache');
const jwksCache = new NodeCache({
stdTTL: 600, // 10 minutes
checkperiod: 120
});
async function getSigningKey(kid) {
// Check cache first
const cached = jwksCache.get(kid);
if (cached) return cached;
// Fetch from Keycloak
const key = await fetchKeyFromKeycloak(kid);
jwksCache.set(kid, key);
return key;
}2. Validate Once Per Request
// Middleware validates once
app.use(validateToken);
// Controllers trust req.user
app.get('/api/profile', (req, res) => {
// No need to re-validate
res.json({ user: req.user });
});3. Async Validation
// Process multiple validations concurrently
const validationPromises = tokens.map(token =>
validateToken(token).catch(err => ({ error: err }))
);
const results = await Promise.all(validationPromises);Advanced Validation Scenarios
Multi-Tenant Validation
function validateMultiTenantToken(token, tenantId) {
const issuer = `${KEYCLOAK_URL}/realms/${tenantId}`;
const jwksUri = `${issuer}/protocol/openid-connect/certs`;
return jwt.verify(token, getKey(jwksUri), {
issuer: issuer,
audience: CLIENT_ID
});
}Refresh Token Handling
async function refreshAccessToken(refreshToken) {
try {
const response = await axios.post(
`${KEYCLOAK_URL}/realms/${REALM}/protocol/openid-connect/token`,
new URLSearchParams({
grant_type: 'refresh_token',
refresh_token: refreshToken,
client_id: CLIENT_ID,
client_secret: CLIENT_SECRET
})
);
return response.data;
} catch (error) {
if (error.response?.status === 400) {
// Refresh token expired or invalid
throw new Error('Session expired. Please login again.');
}
throw error;
}
}Token Introspection
For opaque tokens or additional validation:
async function introspectToken(token) {
const response = await axios.post(
`${KEYCLOAK_URL}/realms/${REALM}/protocol/openid-connect/token/introspect`,
new URLSearchParams({
token: token,
client_id: CLIENT_ID,
client_secret: CLIENT_SECRET
})
);
if (!response.data.active) {
throw new Error('Token is not active');
}
return response.data;
}Testing JWT Validation
Unit Tests
describe('JWT Validation', () => {
it('should reject expired tokens', async () => {
const expiredToken = generateTestToken({ exp: Date.now() / 1000 - 3600 });
await expect(validateToken(expiredToken))
.rejects
.toThrow('jwt expired');
});
it('should reject tokens with wrong audience', async () => {
const wrongAudToken = generateTestToken({ aud: 'wrong-app' });
await expect(validateToken(wrongAudToken))
.rejects
.toThrow('jwt audience invalid');
});
it('should accept valid tokens', async () => {
const validToken = generateTestToken({
iss: `${KEYCLOAK_URL}/realms/${REALM}`,
aud: CLIENT_ID,
exp: Date.now() / 1000 + 3600
});
const decoded = await validateToken(validToken);
expect(decoded.sub).toBeDefined();
});
});Integration Tests
describe('Protected Endpoints', () => {
it('should return 401 without token', async () => {
const response = await request(app)
.get('/api/protected')
.expect(401);
expect(response.body.error).toBe('No token provided');
});
it('should return 200 with valid token', async () => {
const token = await getTestToken();
const response = await request(app)
.get('/api/protected')
.set('Authorization', `Bearer ${token}`)
.expect(200);
expect(response.body.message).toBe('Access granted');
});
});Monitoring and Alerting
Track Validation Metrics
const promClient = require('prom-client');
// Metrics
const validationCounter = new promClient.Counter({
name: 'jwt_validations_total',
help: 'Total JWT validations',
labelNames: ['status', 'reason']
});
const validationDuration = new promClient.Histogram({
name: 'jwt_validation_duration_seconds',
help: 'JWT validation duration'
});
// Track in middleware
const validateTokenWithMetrics = async (req, res, next) => {
const end = validationDuration.startTimer();
try {
await validateToken(req, res, next);
validationCounter.inc({ status: 'success' });
} catch (error) {
validationCounter.inc({
status: 'failure',
reason: error.message
});
throw error;
} finally {
end();
}
};Security Checklist
- Always verify JWT signature
- Validate issuer (iss) claim
- Validate audience (aud) claim
- Check expiration (exp) claim
- Validate not-before (nbf) claim if present
- Use HTTPS everywhere
- Implement key rotation
- Cache JWKS responsibly
- Handle clock skew appropriately
- Validate custom claims for business logic
- Never log full JWT tokens
- Implement proper error handling
- Monitor validation failures
- Keep libraries updated
- Test edge cases thoroughly
Next Steps
- User Management - Manage roles and permissions
- Security Features - Protect your endpoints
- Integration Guides - Framework-specific implementations