JWT Token Validation Best Practices

JWT Token Validation Best Practices

JWT (JSON Web Token) validation is critical for securing your applications. This tutorial shows you exactly how to validate Keycloak JWTs properly, avoid common security pitfalls, and implement best practices that scale.

Understanding Keycloak JWTs

JWT Structure

Keycloak issues JWTs with three parts:

eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6IjEyMzQ1Njc4OTAifQ.eyJleHAiOjE2NDI1MjE2MDAsImlhdCI6MTY0MjUxODAwMCwianRpIjoiYWJjZGVmZ2giLCJpc3MiOiJodHRwczovL2tleWNsb2FrLmV4YW1wbGUuY29tL2F1dGgvcmVhbG1zL215cmVhbG0iLCJhdWQiOlsibXktYXBwIiwiYWNjb3VudCJdLCJzdWIiOiIxMjM0NTY3OC05MGFiLWNkZWYtMTIzNC01Njc4OTBhYmNkZWYiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJteS1hcHAiLCJzZXNzaW9uX3N0YXRlIjoiMTIzNDU2NzgtOTBhYi1jZGVmLTEyMzQtNTY3ODkwYWJjZGVmIiwiYWNyIjoiMSIsInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJ1c2VyIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsibXktYXBwIjp7InJvbGVzIjpbImFkbWluIl19fSwic2NvcGUiOiJvcGVuaWQgZW1haWwgcHJvZmlsZSIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJuYW1lIjoiSm9obiBEb2UiLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huZG9lIiwiZ2l2ZW5fbmFtZSI6IkpvaG4iLCJmYW1pbHlfbmFtZSI6IkRvZSIsImVtYWlsIjoiam9obkBleGFtcGxlLmNvbSJ9.signature

Decoded payload:

{
  "exp": 1642521600,
  "iat": 1642518000,
  "jti": "abcdefgh",
  "iss": "https://keycloak.example.com/realms/myrealm",
  "aud": ["my-app", "account"],
  "sub": "12345678-90ab-cdef-1234-567890abcdef",
  "typ": "Bearer",
  "azp": "my-app",
  "session_state": "12345678-90ab-cdef-1234-567890abcdef",
  "acr": "1",
  "realm_access": {
    "roles": ["user"]
  },
  "resource_access": {
    "my-app": {
      "roles": ["admin"]
    }
  },
  "scope": "openid email profile",
  "email_verified": true,
  "name": "John Doe",
  "preferred_username": "johndoe",
  "given_name": "John",
  "family_name": "Doe",
  "email": "[email protected]"
}

The Five Pillars of JWT Validation

1. Signature Verification

Always verify the signature first!

2. Issuer Validation

Ensure the token comes from your Keycloak instance.

3. Audience Validation

Verify the token is intended for your application.

4. Expiration Check

Never accept expired tokens.

5. Additional Claims

Validate any business-critical claims.

Implementation Examples

Node.js/Express

Complete validation middleware:

const jwksClient = require('jwks-rsa');
const jwt = require('jsonwebtoken');

// JWKS client for key rotation
const client = jwksClient({
  jwksUri: `${process.env.KEYCLOAK_URL}/realms/${process.env.REALM}/protocol/openid-connect/certs`,
  cache: true,
  cacheMaxEntries: 5,
  cacheMaxAge: 600000, // 10 minutes
  rateLimit: true,
  jwksRequestsPerMinute: 10
});

// Get signing key
function getKey(header, callback) {
  client.getSigningKey(header.kid, (err, key) => {
    if (err) {
      return callback(err);
    }
    const signingKey = key.publicKey || key.rsaPublicKey;
    callback(null, signingKey);
  });
}

// Validation middleware
const validateToken = (req, res, next) => {
  const token = extractToken(req);
  
  if (!token) {
    return res.status(401).json({ error: 'No token provided' });
  }
  
  // Verify token
  jwt.verify(token, getKey, {
    // Essential security options
    algorithms: ['RS256'], // Only allow RS256
    issuer: `${process.env.KEYCLOAK_URL}/realms/${process.env.REALM}`,
    audience: process.env.CLIENT_ID,
    clockTolerance: 30 // 30 seconds clock skew tolerance
  }, (err, decoded) => {
    if (err) {
      console.error('Token validation error:', err.message);
      return res.status(401).json({ error: 'Invalid token' });
    }
    
    // Additional validation
    if (!validateAdditionalClaims(decoded)) {
      return res.status(403).json({ error: 'Insufficient permissions' });
    }
    
    // Attach user info to request
    req.user = decoded;
    next();
  });
};

// Extract token from various sources
function extractToken(req) {
  // 1. Authorization header (preferred)
  const authHeader = req.headers.authorization;
  if (authHeader && authHeader.startsWith('Bearer ')) {
    return authHeader.substring(7);
  }
  
  // 2. Cookie (for web apps)
  if (req.cookies && req.cookies.access_token) {
    return req.cookies.access_token;
  }
  
  // 3. Query parameter (use with caution!)
  if (req.query.access_token) {
    console.warn('Token in query parameter - security risk!');
    return req.query.access_token;
  }
  
  return null;
}

// Additional business logic validation
function validateAdditionalClaims(decoded) {
  // Check if email is verified
  if (!decoded.email_verified) {
    return false;
  }
  
  // Check required roles
  const requiredRole = 'user';
  const realmRoles = decoded.realm_access?.roles || [];
  if (!realmRoles.includes(requiredRole)) {
    return false;
  }
  
  // Check custom claims
  if (decoded.organization && !ALLOWED_ORGS.includes(decoded.organization)) {
    return false;
  }
  
  return true;
}

// Use the middleware
app.use('/api/protected', validateToken);

Python/FastAPI

Async validation with caching:

from typing import Optional, Dict, Any
from datetime import datetime, timedelta
import httpx
from jose import jwt, JWTError
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from cachetools import TTLCache
import asyncio

app = FastAPI()
security = HTTPBearer()

# Configuration
KEYCLOAK_URL = "https://keycloak.example.com"
REALM = "myrealm"
CLIENT_ID = "my-app"
JWKS_URL = f"{KEYCLOAK_URL}/realms/{REALM}/protocol/openid-connect/certs"

# Cache for JWKS keys
jwks_cache = TTLCache(maxsize=5, ttl=600)  # 10 minutes
jwks_lock = asyncio.Lock()

class TokenValidator:
    def __init__(self):
        self.http_client = httpx.AsyncClient()
    
    async def get_signing_key(self, token: str) -> Dict[str, Any]:
        """Get the signing key for token verification."""
        # Parse header without verification
        unverified_header = jwt.get_unverified_header(token)
        kid = unverified_header.get("kid")
        
        if not kid:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Token missing key ID"
            )
        
        # Check cache first
        if kid in jwks_cache:
            return jwks_cache[kid]
        
        # Fetch JWKS with lock to prevent thundering herd
        async with jwks_lock:
            # Double-check cache after acquiring lock
            if kid in jwks_cache:
                return jwks_cache[kid]
            
            try:
                response = await self.http_client.get(JWKS_URL)
                response.raise_for_status()
                jwks = response.json()
                
                # Find the key with matching kid
                for key in jwks["keys"]:
                    if key["kid"] == kid:
                        jwks_cache[kid] = key
                        return key
                
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Signing key not found"
                )
            except httpx.HTTPError as e:
                raise HTTPException(
                    status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                    detail=f"Failed to fetch JWKS: {str(e)}"
                )
    
    async def validate_token(self, credentials: HTTPAuthorizationCredentials) -> Dict[str, Any]:
        """Validate JWT token with all security checks."""
        token = credentials.credentials
        
        try:
            # Get signing key
            signing_key = await self.get_signing_key(token)
            
            # Verify token
            payload = jwt.decode(
                token,
                signing_key,
                algorithms=["RS256"],
                issuer=f"{KEYCLOAK_URL}/realms/{REALM}",
                audience=CLIENT_ID,
                options={
                    "verify_signature": True,
                    "verify_exp": True,
                    "verify_nbf": True,
                    "verify_iat": True,
                    "verify_aud": True,
                    "verify_iss": True,
                    "require_exp": True,
                    "require_iat": True,
                    "require_aud": True,
                }
            )
            
            # Additional validation
            self._validate_additional_claims(payload)
            
            return payload
            
        except JWTError as e:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail=f"Token validation failed: {str(e)}"
            )
    
    def _validate_additional_claims(self, payload: Dict[str, Any]):
        """Validate business-specific claims."""
        # Check email verification
        if not payload.get("email_verified", False):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Email not verified"
            )
        
        # Check token type
        if payload.get("typ") != "Bearer":
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid token type"
            )
        
        # Check required scopes
        required_scopes = {"openid", "profile"}
        token_scopes = set(payload.get("scope", "").split())
        if not required_scopes.issubset(token_scopes):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Insufficient scopes"
            )

# Create validator instance
validator = TokenValidator()

# Dependency for protected routes
async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security)
) -> Dict[str, Any]:
    """Get current user from valid token."""
    return await validator.validate_token(credentials)

# Protected endpoint example
@app.get("/api/protected")
async def protected_route(current_user: Dict[str, Any] = Depends(get_current_user)):
    return {
        "message": "Access granted",
        "user": current_user["preferred_username"],
        "email": current_user["email"]
    }

# Role-based protection
def require_role(role: str):
    """Create a dependency that requires a specific role."""
    async def role_checker(current_user: Dict[str, Any] = Depends(get_current_user)):
        realm_roles = current_user.get("realm_access", {}).get("roles", [])
        if role not in realm_roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Role '{role}' required"
            )
        return current_user
    return role_checker

@app.get("/api/admin")
async def admin_only(current_user: Dict[str, Any] = Depends(require_role("admin"))):
    return {"message": "Admin access granted"}

Java/Spring Boot

Production-ready configuration:

@Configuration
@EnableWebSecurity
@EnableGlobalMethodSecurity(prePostEnabled = true)
public class SecurityConfig {
    
    @Value("${keycloak.jwk-set-uri}")
    private String jwkSetUri;
    
    @Value("${keycloak.issuer}")
    private String issuer;
    
    @Value("${keycloak.audience}")
    private String audience;
    
    @Bean
    public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
        http
            .cors().and()
            .csrf().disable()
            .sessionManagement()
                .sessionCreationPolicy(SessionCreationPolicy.STATELESS)
            .and()
            .authorizeRequests()
                .antMatchers("/api/public/**").permitAll()
                .antMatchers("/api/admin/**").hasRole("ADMIN")
                .anyRequest().authenticated()
            .and()
            .oauth2ResourceServer()
                .jwt()
                .jwtAuthenticationConverter(jwtAuthenticationConverter());
        
        return http.build();
    }
    
    @Bean
    public JwtDecoder jwtDecoder() {
        NimbusJwtDecoder decoder = NimbusJwtDecoder
            .withJwkSetUri(jwkSetUri)
            .cache(Duration.ofMinutes(5))
            .build();
        
        // Add custom validators
        decoder.setJwtValidator(jwtValidator());
        
        return decoder;
    }
    
    @Bean
    public OAuth2TokenValidator<Jwt> jwtValidator() {
        List<OAuth2TokenValidator<Jwt>> validators = new ArrayList<>();
        
        // Standard validators
        validators.add(new JwtTimestampValidator());
        validators.add(new JwtIssuerValidator(issuer));
        
        // Custom validators
        validators.add(new JwtAudienceValidator(audience));
        validators.add(new EmailVerifiedValidator());
        validators.add(new CustomClaimsValidator());
        
        return new DelegatingOAuth2TokenValidator<>(validators);
    }
    
    @Bean
    public JwtAuthenticationConverter jwtAuthenticationConverter() {
        JwtAuthenticationConverter converter = new JwtAuthenticationConverter();
        converter.setJwtGrantedAuthoritiesConverter(jwt -> {
            // Extract roles from Keycloak token
            Collection<GrantedAuthority> authorities = new ArrayList<>();
            
            // Realm roles
            Map<String, Object> realmAccess = jwt.getClaimAsMap("realm_access");
            if (realmAccess != null && realmAccess.containsKey("roles")) {
                List<String> roles = (List<String>) realmAccess.get("roles");
                roles.stream()
                    .map(role -> new SimpleGrantedAuthority("ROLE_" + role.toUpperCase()))
                    .forEach(authorities::add);
            }
            
            // Client roles
            Map<String, Object> resourceAccess = jwt.getClaimAsMap("resource_access");
            if (resourceAccess != null && resourceAccess.containsKey(audience)) {
                Map<String, Object> clientAccess = (Map<String, Object>) resourceAccess.get(audience);
                if (clientAccess.containsKey("roles")) {
                    List<String> clientRoles = (List<String>) clientAccess.get("roles");
                    clientRoles.stream()
                        .map(role -> new SimpleGrantedAuthority("ROLE_CLIENT_" + role.toUpperCase()))
                        .forEach(authorities::add);
                }
            }
            
            return authorities;
        });
        
        return converter;
    }
}

// Custom validators
@Component
public class JwtAudienceValidator implements OAuth2TokenValidator<Jwt> {
    private final String expectedAudience;
    
    public JwtAudienceValidator(String expectedAudience) {
        this.expectedAudience = expectedAudience;
    }
    
    @Override
    public OAuth2TokenValidatorResult validate(Jwt jwt) {
        List<String> audiences = jwt.getAudience();
        if (audiences != null && audiences.contains(expectedAudience)) {
            return OAuth2TokenValidatorResult.success();
        }
        return OAuth2TokenValidatorResult.failure(new OAuth2Error(
            "invalid_audience",
            "The required audience is missing",
            null
        ));
    }
}

@Component
public class EmailVerifiedValidator implements OAuth2TokenValidator<Jwt> {
    @Override
    public OAuth2TokenValidatorResult validate(Jwt jwt) {
        Boolean emailVerified = jwt.getClaim("email_verified");
        if (Boolean.TRUE.equals(emailVerified)) {
            return OAuth2TokenValidatorResult.success();
        }
        return OAuth2TokenValidatorResult.failure(new OAuth2Error(
            "email_not_verified",
            "Email verification required",
            null
        ));
    }
}

Common Security Mistakes

1. ❌ Not Validating Signature

Bad:

// NEVER DO THIS!
const decoded = jwt.decode(token); // No signature verification!
if (decoded.exp > Date.now() / 1000) {
  // "Validated"
}

Good:

jwt.verify(token, publicKey, { algorithms: ['RS256'] });

2. ❌ Accepting Any Algorithm

Bad:

// Vulnerable to algorithm confusion attack
jwt.verify(token, publicKey); // No algorithm specified!

Good:

jwt.verify(token, publicKey, { algorithms: ['RS256'] });

3. ❌ Not Checking Audience

Bad:

// Token might be for different application!
jwt.verify(token, publicKey, {
  issuer: keycloakIssuer
  // Missing audience check
});

Good:

jwt.verify(token, publicKey, {
  issuer: keycloakIssuer,
  audience: 'my-app'
});

4. ❌ Storing Tokens Insecurely

Bad:

// Vulnerable to XSS
localStorage.setItem('token', accessToken);

Good:

// HttpOnly cookie (set by server)
res.cookie('access_token', token, {
  httpOnly: true,
  secure: true,
  sameSite: 'strict',
  maxAge: 15 * 60 * 1000 // 15 minutes
});

Performance Best Practices

1. Cache JWKS Keys

const NodeCache = require('node-cache');
const jwksCache = new NodeCache({ 
  stdTTL: 600, // 10 minutes
  checkperiod: 120 
});

async function getSigningKey(kid) {
  // Check cache first
  const cached = jwksCache.get(kid);
  if (cached) return cached;
  
  // Fetch from Keycloak
  const key = await fetchKeyFromKeycloak(kid);
  jwksCache.set(kid, key);
  
  return key;
}

2. Validate Once Per Request

// Middleware validates once
app.use(validateToken);

// Controllers trust req.user
app.get('/api/profile', (req, res) => {
  // No need to re-validate
  res.json({ user: req.user });
});

3. Async Validation

// Process multiple validations concurrently
const validationPromises = tokens.map(token => 
  validateToken(token).catch(err => ({ error: err }))
);

const results = await Promise.all(validationPromises);

Advanced Validation Scenarios

Multi-Tenant Validation

function validateMultiTenantToken(token, tenantId) {
  const issuer = `${KEYCLOAK_URL}/realms/${tenantId}`;
  const jwksUri = `${issuer}/protocol/openid-connect/certs`;
  
  return jwt.verify(token, getKey(jwksUri), {
    issuer: issuer,
    audience: CLIENT_ID
  });
}

Refresh Token Handling

async function refreshAccessToken(refreshToken) {
  try {
    const response = await axios.post(
      `${KEYCLOAK_URL}/realms/${REALM}/protocol/openid-connect/token`,
      new URLSearchParams({
        grant_type: 'refresh_token',
        refresh_token: refreshToken,
        client_id: CLIENT_ID,
        client_secret: CLIENT_SECRET
      })
    );
    
    return response.data;
  } catch (error) {
    if (error.response?.status === 400) {
      // Refresh token expired or invalid
      throw new Error('Session expired. Please login again.');
    }
    throw error;
  }
}

Token Introspection

For opaque tokens or additional validation:

async function introspectToken(token) {
  const response = await axios.post(
    `${KEYCLOAK_URL}/realms/${REALM}/protocol/openid-connect/token/introspect`,
    new URLSearchParams({
      token: token,
      client_id: CLIENT_ID,
      client_secret: CLIENT_SECRET
    })
  );
  
  if (!response.data.active) {
    throw new Error('Token is not active');
  }
  
  return response.data;
}

Testing JWT Validation

Unit Tests

describe('JWT Validation', () => {
  it('should reject expired tokens', async () => {
    const expiredToken = generateTestToken({ exp: Date.now() / 1000 - 3600 });
    
    await expect(validateToken(expiredToken))
      .rejects
      .toThrow('jwt expired');
  });
  
  it('should reject tokens with wrong audience', async () => {
    const wrongAudToken = generateTestToken({ aud: 'wrong-app' });
    
    await expect(validateToken(wrongAudToken))
      .rejects
      .toThrow('jwt audience invalid');
  });
  
  it('should accept valid tokens', async () => {
    const validToken = generateTestToken({
      iss: `${KEYCLOAK_URL}/realms/${REALM}`,
      aud: CLIENT_ID,
      exp: Date.now() / 1000 + 3600
    });
    
    const decoded = await validateToken(validToken);
    expect(decoded.sub).toBeDefined();
  });
});

Integration Tests

describe('Protected Endpoints', () => {
  it('should return 401 without token', async () => {
    const response = await request(app)
      .get('/api/protected')
      .expect(401);
    
    expect(response.body.error).toBe('No token provided');
  });
  
  it('should return 200 with valid token', async () => {
    const token = await getTestToken();
    
    const response = await request(app)
      .get('/api/protected')
      .set('Authorization', `Bearer ${token}`)
      .expect(200);
    
    expect(response.body.message).toBe('Access granted');
  });
});

Monitoring and Alerting

Track Validation Metrics

const promClient = require('prom-client');

// Metrics
const validationCounter = new promClient.Counter({
  name: 'jwt_validations_total',
  help: 'Total JWT validations',
  labelNames: ['status', 'reason']
});

const validationDuration = new promClient.Histogram({
  name: 'jwt_validation_duration_seconds',
  help: 'JWT validation duration'
});

// Track in middleware
const validateTokenWithMetrics = async (req, res, next) => {
  const end = validationDuration.startTimer();
  
  try {
    await validateToken(req, res, next);
    validationCounter.inc({ status: 'success' });
  } catch (error) {
    validationCounter.inc({ 
      status: 'failure', 
      reason: error.message 
    });
    throw error;
  } finally {
    end();
  }
};

Security Checklist

  • Always verify JWT signature
  • Validate issuer (iss) claim
  • Validate audience (aud) claim
  • Check expiration (exp) claim
  • Validate not-before (nbf) claim if present
  • Use HTTPS everywhere
  • Implement key rotation
  • Cache JWKS responsibly
  • Handle clock skew appropriately
  • Validate custom claims for business logic
  • Never log full JWT tokens
  • Implement proper error handling
  • Monitor validation failures
  • Keep libraries updated
  • Test edge cases thoroughly

Next Steps