authentication

Provides comprehensive authentication and authorization capabilities for the Golden Armada AI Agent Fleet Platform.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "authentication" with this command: npx skills add lobbi-docs/claude/lobbi-docs-claude-authentication

Authentication Skill

Provides comprehensive authentication and authorization capabilities for the Golden Armada AI Agent Fleet Platform.

When to Use This Skill

Activate this skill when working with:

  • User authentication flows

  • JWT token management

  • OAuth2 integration

  • Session management

  • Role-based access control (RBAC)

JWT Authentication

Token Generation

from jose import jwt
from datetime import datetime, timedelta
from passlib.context import CryptContext

SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def create_access_token(user_id: str, roles: list[str]) -> str:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {
"sub": user_id,
"roles": roles,
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def create_refresh_token(user_id: str) -> str:
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
payload = {
"sub": user_id,
"exp": expire,
"type": "refresh"
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def verify_token(token: str) -> dict:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.JWTError:
raise HTTPException(status_code=401, detail="Invalid token")

Password Hashing

def hash_password(password: str) -> str:
return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)

Usage

async def authenticate_user(email: str, password: str) -> User | None:
user = await get_user_by_email(email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user

FastAPI Auth Dependencies

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
payload = verify_token(token)
user = await get_user(payload["sub"])
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user

async def get_current_active_user(user: User = Depends(get_current_user)) -> User:
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return user

Role-based access

def require_roles(*roles: str):
async def role_checker(user: User = Depends(get_current_user)):
if not any(role in user.roles for role in roles):
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
return role_checker

Usage

@app.get("/admin")
async def admin_route(user: User = Depends(require_roles("admin"))):
return {"message": "Admin access granted"}

OAuth2 Integration

Google OAuth2

from authlib.integrations.starlette_client import OAuth

oauth = OAuth()
oauth.register(
name='google',
client_id=os.environ['GOOGLE_CLIENT_ID'],
client_secret=os.environ['GOOGLE_CLIENT_SECRET'],
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={'scope': 'openid email profile'}
)

@app.get('/auth/google')
async def google_login(request: Request):
redirect_uri = request.url_for('google_callback')
return await oauth.google.authorize_redirect(request, redirect_uri)

@app.get('/auth/google/callback')
async def google_callback(request: Request):
token = await oauth.google.authorize_access_token(request)
user_info = token.get('userinfo')

# Find or create user
user = await get_or_create_user(
    email=user_info['email'],
    name=user_info['name'],
    provider='google'
)

# Generate JWT
access_token = create_access_token(user.id, user.roles)
return {"access_token": access_token, "token_type": "bearer"}

GitHub OAuth2

oauth.register(
name='github',
client_id=os.environ['GITHUB_CLIENT_ID'],
client_secret=os.environ['GITHUB_CLIENT_SECRET'],
authorize_url='https://github.com/login/oauth/authorize',
access_token_url='https://github.com/login/oauth/access_token',
api_base_url='https://api.github.com/',
client_kwargs={'scope': 'user:email'}
)

Session Management

from fastapi import Request, Response
import secrets

async def create_session(user_id: str, response: Response) -> str:
session_id = secrets.token_urlsafe(32)

# Store in Redis
await redis.hset(f"session:{session_id}", mapping={
    "user_id": user_id,
    "created_at": datetime.utcnow().isoformat()
})
await redis.expire(f"session:{session_id}", 86400)  # 24 hours

# Set cookie
response.set_cookie(
    key="session_id",
    value=session_id,
    httponly=True,
    secure=True,
    samesite="lax",
    max_age=86400
)

return session_id

async def get_session(request: Request) -> dict | None:
session_id = request.cookies.get("session_id")
if not session_id:
return None

session = await redis.hgetall(f"session:{session_id}")
if not session:
    return None

# Refresh TTL
await redis.expire(f"session:{session_id}", 86400)
return session

async def destroy_session(request: Request, response: Response):
session_id = request.cookies.get("session_id")
if session_id:
await redis.delete(f"session:{session_id}")
response.delete_cookie("session_id")

RBAC Implementation

from enum import Enum
from typing import Set

class Permission(str, Enum):
READ_AGENTS = "read:agents"
WRITE_AGENTS = "write:agents"
DELETE_AGENTS = "delete:agents"
ADMIN = "admin"

ROLE_PERMISSIONS: dict[str, Set[Permission]] = {
"viewer": {Permission.READ_AGENTS},
"operator": {Permission.READ_AGENTS, Permission.WRITE_AGENTS},
"admin": {Permission.READ_AGENTS, Permission.WRITE_AGENTS, Permission.DELETE_AGENTS, Permission.ADMIN},
}

def has_permission(user_roles: list[str], required: Permission) -> bool:
for role in user_roles:
if role in ROLE_PERMISSIONS and required in ROLE_PERMISSIONS[role]:
return True
return False

def require_permission(permission: Permission):
async def permission_checker(user: User = Depends(get_current_user)):
if not has_permission(user.roles, permission):
raise HTTPException(status_code=403, detail="Permission denied")
return user
return permission_checker

Usage

@app.delete("/agents/{id}")
async def delete_agent(
id: str,
user: User = Depends(require_permission(Permission.DELETE_AGENTS))
):
await agent_service.delete(id)
return {"status": "deleted"}

Security Best Practices

  • Use HTTPS always in production

  • Hash passwords with bcrypt or argon2

  • Short-lived access tokens (15-30 minutes)

  • Refresh token rotation on each use

  • HttpOnly, Secure cookies for tokens

  • Rate limit authentication endpoints

  • Log authentication events for auditing

  • Implement account lockout after failed attempts

OAuth 2.0 & OIDC Best Practices

OAuth 2.0 Grant Types

Authorization Code Flow (Recommended for Web Apps)

from authlib.integrations.starlette_client import OAuth

oauth = OAuth() oauth.register( name='custom_provider', client_id=os.environ['OAUTH_CLIENT_ID'], client_secret=os.environ['OAUTH_CLIENT_SECRET'], authorize_url='https://provider.com/oauth/authorize', authorize_params=None, access_token_url='https://provider.com/oauth/token', access_token_params=None, refresh_token_url=None, client_kwargs={'scope': 'openid profile email'} )

@app.get('/auth/login') async def login(request: Request): # Generate state for CSRF protection state = secrets.token_urlsafe(32) await redis.set(f"oauth_state:{state}", "1", ex=600)

redirect_uri = request.url_for('auth_callback')
return await oauth.custom_provider.authorize_redirect(
    request,
    redirect_uri,
    state=state
)

@app.get('/auth/callback') async def auth_callback(request: Request): # Verify state (CSRF protection) state = request.query_params.get('state') if not await redis.get(f"oauth_state:{state}"): raise HTTPException(status_code=400, detail="Invalid state")

await redis.delete(f"oauth_state:{state}")

# Exchange authorization code for tokens
token = await oauth.custom_provider.authorize_access_token(request)
user_info = token.get('userinfo')

# Create or update user
user = await upsert_user(user_info)

# Issue application tokens
access_token = create_access_token(user.id, user.roles)
refresh_token = create_refresh_token(user.id)

return {
    "access_token": access_token,
    "refresh_token": refresh_token,
    "token_type": "bearer"
}

PKCE (Proof Key for Code Exchange) for SPAs

Frontend (JavaScript/TypeScript)

import { generateCodeVerifier, generateCodeChallenge } from 'oauth-pkce'

// Generate PKCE parameters const codeVerifier = generateCodeVerifier() const codeChallenge = await generateCodeChallenge(codeVerifier)

// Store verifier for later use sessionStorage.setItem('code_verifier', codeVerifier)

// Redirect to authorization endpoint const authUrl = new URL('https://provider.com/oauth/authorize') authUrl.searchParams.set('client_id', CLIENT_ID) authUrl.searchParams.set('redirect_uri', REDIRECT_URI) authUrl.searchParams.set('response_type', 'code') authUrl.searchParams.set('scope', 'openid profile email') authUrl.searchParams.set('code_challenge', codeChallenge) authUrl.searchParams.set('code_challenge_method', 'S256') authUrl.searchParams.set('state', generateState())

window.location.href = authUrl.toString()

// In callback handler const code = new URLSearchParams(window.location.search).get('code') const codeVerifier = sessionStorage.getItem('code_verifier')

const tokenResponse = await fetch('https://provider.com/oauth/token', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'authorization_code', code: code, redirect_uri: REDIRECT_URI, client_id: CLIENT_ID, code_verifier: codeVerifier }) })

Client Credentials Flow (Service-to-Service)

import httpx from datetime import datetime, timedelta

class ServiceAuthClient: def init(self): self.token = None self.expires_at = None

async def get_token(self) -> str:
    # Return cached token if still valid
    if self.token and self.expires_at > datetime.utcnow():
        return self.token

    # Request new token
    async with httpx.AsyncClient() as client:
        response = await client.post(
            'https://provider.com/oauth/token',
            data={
                'grant_type': 'client_credentials',
                'client_id': os.environ['SERVICE_CLIENT_ID'],
                'client_secret': os.environ['SERVICE_CLIENT_SECRET'],
                'scope': 'api.read api.write'
            }
        )
        response.raise_for_status()
        data = response.json()

        self.token = data['access_token']
        self.expires_at = datetime.utcnow() + timedelta(seconds=data['expires_in'] - 60)

        return self.token

Usage

auth_client = ServiceAuthClient()

async def call_protected_api(): token = await auth_client.get_token() async with httpx.AsyncClient() as client: response = await client.get( 'https://api.service.com/resource', headers={'Authorization': f'Bearer {token}'} ) return response.json()

OpenID Connect (OIDC)

ID Token Validation

from jose import jwt, jwk from jose.utils import base64url_decode import httpx

class OIDCValidator: def init(self, issuer: str, client_id: str): self.issuer = issuer self.client_id = client_id self.jwks = None self.jwks_updated_at = None

async def get_jwks(self) -> dict:
    # Refresh JWKS if stale (cache for 24 hours)
    if not self.jwks or (datetime.utcnow() - self.jwks_updated_at).seconds > 86400:
        async with httpx.AsyncClient() as client:
            response = await client.get(f"{self.issuer}/.well-known/jwks.json")
            response.raise_for_status()
            self.jwks = response.json()
            self.jwks_updated_at = datetime.utcnow()

    return self.jwks

async def validate_id_token(self, id_token: str) -> dict:
    # Decode header to get key ID
    header = jwt.get_unverified_header(id_token)
    kid = header['kid']

    # Get JWKS and find matching key
    jwks = await self.get_jwks()
    key = next((k for k in jwks['keys'] if k['kid'] == kid), None)

    if not key:
        raise ValueError("Public key not found in JWKS")

    # Convert JWK to PEM
    public_key = jwk.construct(key)

    # Validate and decode ID token
    try:
        claims = jwt.decode(
            id_token,
            public_key.to_pem().decode('utf-8'),
            algorithms=['RS256'],
            audience=self.client_id,
            issuer=self.issuer,
            options={
                'verify_exp': True,
                'verify_iat': True,
                'verify_aud': True,
                'verify_iss': True
            }
        )

        # Additional validations
        if claims.get('nonce'):
            # Verify nonce matches what was sent in auth request
            pass

        return claims

    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="ID token expired")
    except jwt.JWTClaimsError as e:
        raise HTTPException(status_code=401, detail=f"Invalid ID token claims: {e}")
    except Exception as e:
        raise HTTPException(status_code=401, detail=f"ID token validation failed: {e}")

Usage

validator = OIDCValidator( issuer="https://provider.com", client_id=os.environ['OIDC_CLIENT_ID'] )

@app.post("/auth/oidc/callback") async def oidc_callback(id_token: str): claims = await validator.validate_id_token(id_token)

# Extract user information
user = await get_or_create_user(
    email=claims['email'],
    name=claims['name'],
    sub=claims['sub']
)

return {"user": user, "claims": claims}

JWT Security Best Practices

Secure Token Generation

from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend import os

Use RS256 (asymmetric) instead of HS256 for public verification

PRIVATE_KEY_PATH = os.environ.get('JWT_PRIVATE_KEY_PATH') PUBLIC_KEY_PATH = os.environ.get('JWT_PUBLIC_KEY_PATH')

def load_keys(): with open(PRIVATE_KEY_PATH, 'rb') as f: private_key = serialization.load_pem_private_key( f.read(), password=None, backend=default_backend() )

with open(PUBLIC_KEY_PATH, 'rb') as f:
    public_key = serialization.load_pem_public_key(
        f.read(),
        backend=default_backend()
    )

return private_key, public_key

PRIVATE_KEY, PUBLIC_KEY = load_keys()

def create_secure_access_token( user_id: str, roles: list[str], tenant_id: str = None, custom_claims: dict = None ) -> str: now = datetime.utcnow()

payload = {
    # Standard claims (RFC 7519)
    "iss": "https://api.yourdomain.com",  # Issuer
    "sub": user_id,                        # Subject (user ID)
    "aud": ["https://api.yourdomain.com"], # Audience
    "exp": now + timedelta(minutes=15),    # Expiration (short-lived)
    "nbf": now,                             # Not before
    "iat": now,                             # Issued at
    "jti": secrets.token_urlsafe(16),      # JWT ID (unique token identifier)

    # Custom claims
    "roles": roles,
    "type": "access",
}

# Add tenant context for multi-tenant applications
if tenant_id:
    payload["tenant_id"] = tenant_id

# Add any custom claims
if custom_claims:
    payload.update(custom_claims)

return jwt.encode(payload, PRIVATE_KEY, algorithm="RS256")

def verify_secure_token(token: str) -> dict: try: payload = jwt.decode( token, PUBLIC_KEY, algorithms=["RS256"], audience=["https://api.yourdomain.com"], issuer="https://api.yourdomain.com", options={ 'verify_exp': True, 'verify_nbf': True, 'verify_iat': True, 'verify_aud': True, 'verify_iss': True, 'require_exp': True, 'require_iat': True, 'require_nbf': True } )

    # Validate token type
    if payload.get('type') != 'access':
        raise HTTPException(status_code=401, detail="Invalid token type")

    return payload

except jwt.ExpiredSignatureError:
    raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError as e:
    raise HTTPException(status_code=401, detail=f"Invalid token: {e}")

Token Revocation with Blacklist

class TokenBlacklist: """Redis-based token blacklist for revoked tokens"""

def __init__(self, redis_client):
    self.redis = redis_client

async def revoke_token(self, jti: str, exp: int):
    """Add token to blacklist until expiration"""
    ttl = exp - int(datetime.utcnow().timestamp())
    if ttl > 0:
        await self.redis.set(f"blacklist:{jti}", "1", ex=ttl)

async def is_revoked(self, jti: str) -> bool:
    """Check if token is blacklisted"""
    return await self.redis.exists(f"blacklist:{jti}")

Global blacklist instance

token_blacklist = TokenBlacklist(redis)

async def get_current_user_with_revocation_check( token: str = Depends(oauth2_scheme) ) -> User: payload = verify_secure_token(token)

# Check if token has been revoked
if await token_blacklist.is_revoked(payload['jti']):
    raise HTTPException(status_code=401, detail="Token has been revoked")

user = await get_user(payload["sub"])
if not user:
    raise HTTPException(status_code=401, detail="User not found")

return user

@app.post("/auth/logout") async def logout(token: str = Depends(oauth2_scheme)): payload = verify_secure_token(token) await token_blacklist.revoke_token(payload['jti'], payload['exp']) return {"message": "Logged out successfully"}

Security Analysis with Extended Thinking

When reviewing authentication flows, use extended thinking for comprehensive security analysis.

Authentication Flow Security Review Template

Authentication Flow Security Review

Flow: [Login/OAuth/SSO/API Authentication] Date: [YYYY-MM-DD] Reviewer: [Name/Agent]

Flow Diagram

[Document the authentication flow step-by-step]

Security Analysis Checklist

Confidentiality

  • Credentials transmitted over HTTPS only
  • Passwords hashed with strong algorithm (bcrypt/argon2)
  • Tokens encrypted in transit
  • Sensitive data not logged
  • Secrets stored securely (env vars, secrets manager)

Integrity

  • CSRF protection implemented
  • Request tampering prevented
  • Token signature validation
  • State parameter validated (OAuth)
  • Nonce validated (OIDC)

Availability

  • Rate limiting on auth endpoints
  • Account lockout after failed attempts
  • DDoS protection in place
  • Graceful degradation strategy
  • Session timeout configured

Authentication

  • MFA available for sensitive accounts
  • Password complexity requirements
  • Credential stuffing protection
  • Brute force mitigation
  • Session fixation prevention

Authorization

  • Principle of least privilege
  • Role-based access control
  • Permission checks on every request
  • Token scope validation
  • Tenant isolation (multi-tenant apps)

Session Management

  • Secure session tokens
  • HttpOnly, Secure, SameSite cookies
  • Session timeout implemented
  • Logout functionality
  • Concurrent session handling

Extended Thinking Analysis

Use Claude with extended thinking for deep security review:

import anthropic

client = anthropic.Anthropic()

security_review_prompt = """
Perform a comprehensive security analysis of this authentication flow:

[Paste authentication code/flow description]

Analyze for:
1. OWASP Top 10 vulnerabilities
2. Authentication bypass possibilities
3. Token security weaknesses
4. Session management issues
5. Input validation gaps
6. Race conditions
7. Logic flaws

Provide specific findings with:
- Severity (Critical/High/Medium/Low)
- Location (file:line)
- Attack vector
- Remediation steps
"""

response = client.messages.create(
    model="claude-opus-4-5-20250514",
    max_tokens=32000,
    thinking={
        "type": "enabled",
        "budget_tokens": 20000  # High budget for security analysis
    },
    messages=[{
        "role": "user",
        "content": security_review_prompt
    }]
)

# Extract thinking and analysis
for block in response.content:
    if block.type == "thinking":
        print(f"Deep Analysis:\n{block.thinking}\n")
    elif block.type == "text":
        print(f"Findings:\n{block.text}")

Threat Modeling for Authentication

STRIDE Threat Model for Auth Systems

Adapted from [[deep-analysis]] skill threat modeling template:

## Authentication Threat Model

### System: [Auth System Name]
**Version**: 1.0
**Last Updated**: [Date]

### Trust Boundaries

┌─────────────────────────────────────────┐
│         External (Untrusted)            │
│    [End Users] [Credential Stuffers]    │
│         [MITM Attackers]                │
└──────────────────┬──────────────────────┘
│ TLS/HTTPS
┌──────────────────┴──────────────────────┐
│         Public API (Semi-trusted)       │
│   [API Gateway] [Auth Endpoints]        │
│   [OAuth Providers] [OIDC IdP]          │
└──────────────────┬──────────────────────┘
│ Internal Auth
┌──────────────────┴──────────────────────┐
│        Application Layer (Trusted)      │
│   [Business Logic] [User Management]    │
└──────────────────┬──────────────────────┘
│ DB Protocol
┌──────────────────┴──────────────────────┐
│        Data Layer (Highly Trusted)      │
│   [User DB] [Session Store] [Secrets]   │
└─────────────────────────────────────────┘

### STRIDE Analysis

#### Spoofing Identity
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Credential theft via phishing | High | Critical | MFA, user education | ✅ |
| Session hijacking | Medium | High | Secure cookies, HTTPS | ✅ |
| Token replay attacks | Medium | High | Short token lifetime, JTI tracking | ✅ |
| OAuth state manipulation | Low | Medium | Cryptographic state validation | ✅ |
| Impersonation via stolen refresh token | Medium | Critical | Refresh token rotation, device binding | ⚠️ |

#### Tampering with Data
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| JWT payload manipulation | Low | Critical | Signature verification, RS256 | ✅ |
| Cookie tampering | Low | High | Signed cookies, HMAC validation | ✅ |
| OAuth callback manipulation | Medium | High | Redirect URI validation | ✅ |
| Password reset token tampering | Low | High | Cryptographic tokens, time limits | ✅ |

#### Repudiation
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Deny unauthorized access | Medium | Medium | Comprehensive audit logging | ✅ |
| Dispute authentication events | Low | Low | Immutable audit trail, timestamps | ✅ |

#### Information Disclosure
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Credentials in logs | Medium | Critical | Sanitize logs, secret detection | ✅ |
| User enumeration via login | High | Medium | Generic error messages | ⚠️ |
| Token leakage in URLs | Low | High | Tokens in headers only | ✅ |
| Timing attacks on password check | Medium | Medium | Constant-time comparison | ✅ |
| JWKS endpoint information leak | Low | Low | Rate limiting, monitoring | ✅ |

#### Denial of Service
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Brute force attacks | High | High | Rate limiting, CAPTCHA, account lockout | ✅ |
| Resource exhaustion (bcrypt) | Medium | Medium | Request throttling, async processing | ✅ |
| Session store exhaustion | Low | High | Session limits per user, TTL | ✅ |
| OAuth callback flooding | Medium | Medium | State validation, rate limiting | ⚠️ |

#### Elevation of Privilege
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Role manipulation in JWT | Low | Critical | Server-side role verification | ✅ |
| Privilege escalation via API | Medium | Critical | Permission checks on every request | ✅ |
| Admin impersonation | Low | Critical | Additional auth for admin actions | ⚠️ |
| OAuth scope escalation | Low | High | Strict scope validation | ✅ |

### Risk Matrix
| Threat ID | Threat | Likelihood | Impact | Risk Score | Priority |
|-----------|--------|------------|--------|------------|----------|
| T1 | Credential stuffing attack | High | Critical | 9 | P0 |
| T2 | Refresh token theft | Medium | Critical | 8 | P1 |
| T3 | User enumeration | High | Medium | 6 | P2 |
| T4 | OAuth callback flooding | Medium | Medium | 4 | P2 |
| T5 | Admin impersonation | Low | Critical | 7 | P1 |

### Attack Scenarios

#### Scenario 1: Credential Stuffing Attack
**Attacker Goal**: Gain unauthorized access using leaked credentials

**Attack Steps**:
1. Obtain credential database from breach
2. Automate login attempts across accounts
3. Bypass rate limiting with distributed IPs
4. Identify valid credentials
5. Access user accounts

**Defenses**:
- Rate limiting per IP and per account
- CAPTCHA after N failed attempts
- Anomaly detection (impossible travel, new device)
- Breach database monitoring (HaveIBeenPwned)
- Mandatory MFA for sensitive accounts

#### Scenario 2: Token Theft via XSS
**Attacker Goal**: Steal access token to impersonate user

**Attack Steps**:
1. Inject malicious script via vulnerable input
2. Script reads token from localStorage
3. Exfiltrate token to attacker server
4. Use token to access API as victim

**Defenses**:
- Store tokens in HttpOnly cookies (not accessible to JS)
- Content Security Policy (CSP)
- Input sanitization and validation
- Regular security audits
- Short token lifetimes

### Recommendations by Priority

#### P0 (Critical - Immediate Action)
1. Implement credential stuffing protection
2. Add device fingerprinting for anomaly detection
3. Enable MFA for all admin accounts

#### P1 (High - Within Sprint)
1. Implement refresh token rotation
2. Add additional auth step for admin impersonation
3. Strengthen OAuth callback validation

#### P2 (Medium - Next Quarter)
1. Improve user enumeration protection
2. Implement risk-based authentication
3. Add behavioral biometrics

Common Vulnerabilities & Mitigations

OWASP Top 10 for Authentication

A01: Broken Access Control

# VULNERABLE: Client-side role check only
@app.get("/admin/users")
async def get_users(user: User = Depends(get_current_user)):
    # No server-side permission check!
    return await db.users.find_all()

# SECURE: Server-side permission enforcement
@app.get("/admin/users")
async def get_users(user: User = Depends(require_permission(Permission.ADMIN))):
    # Permission verified on server
    return await db.users.find_all()

A02: Cryptographic Failures

# VULNERABLE: Weak hashing
hashed = hashlib.md5(password.encode()).hexdigest()

# SECURE: Strong adaptive hashing
from passlib.context import CryptContext
pwd_context = CryptContext(
    schemes=["bcrypt"],
    deprecated="auto",
    bcrypt__rounds=12  # Adjust based on security/performance needs
)
hashed = pwd_context.hash(password)

A03: Injection

# VULNERABLE: SQL injection in auth query
query = f"SELECT * FROM users WHERE email = '{email}' AND password = '{password}'"

# SECURE: Parameterized queries
query = "SELECT * FROM users WHERE email = $1"
user = await db.fetch_one(query, email)
if user and pwd_context.verify(password, user.hashed_password):
    return user

A07: Identification and Authentication Failures

# VULNERABLE: Weak session management
session_id = hashlib.md5(str(time.time()).encode()).hexdigest()

# SECURE: Cryptographically secure session tokens
import secrets
session_id = secrets.token_urlsafe(32)

# VULNERABLE: No rate limiting
@app.post("/auth/login")
async def login(credentials: LoginRequest):
    return await authenticate(credentials)

# SECURE: Rate limiting with slowdown
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/auth/login")
@limiter.limit("5/minute")  # 5 attempts per minute
async def login(request: Request, credentials: LoginRequest):
    return await authenticate(credentials)

Integration with Keycloak

For enterprise-grade auth, integrate with Keycloak (see [[keycloak]] skill):

from keycloak import KeycloakOpenID

# Configure Keycloak client
keycloak_openid = KeycloakOpenID(
    server_url="http://localhost:8080/",
    client_id="your-client",
    realm_name="your-realm",
    client_secret_key="your-secret"
)

# Get token
token = keycloak_openid.token(username, password)

# Validate token
token_info = keycloak_openid.introspect(token['access_token'])

# Get user info
user_info = keycloak_openid.userinfo(token['access_token'])

# Decode and verify token locally
KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + \
    keycloak_openid.public_key() + \
    "\n-----END PUBLIC KEY-----"

options = {"verify_signature": True, "verify_aud": True, "verify_exp": True}
token_info = keycloak_openid.decode_token(
    token['access_token'],
    key=KEYCLOAK_PUBLIC_KEY,
    options=options
)

Related Skills & Resources

Skills

- [[keycloak]] - Enterprise identity and access management

- [[deep-analysis]] - Security audit templates and threat modeling

- [[extended-thinking]] - Enable deep reasoning for security analysis

- [[complex-reasoning]] - Hypothesis-driven debugging for auth issues

Keycloak Agents

- keycloak-realm-admin - Realm and client management

- keycloak-security-auditor - Security review and compliance

- keycloak-auth-flow-designer - Custom authentication flows

- keycloak-identity-specialist - Federation and SSO setup

External Resources

- OAuth 2.0 RFC 6749

- OpenID Connect Core 1.0

- JWT Best Practices RFC 8725

- OWASP Authentication Cheat Sheet

- OWASP Session Management

Troubleshooting

Common Issues

Token Verification Failures

# Debug JWT token
python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False}))"

# Verify token signature
openssl dgst -sha256 -verify public_key.pem -signature signature.bin token_payload.txt

# Check token expiration
date -d @$(python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False})['exp'])")

OAuth Flow Issues

# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)

# Log OAuth flow steps
logger.debug(f"Redirect URI: {redirect_uri}")
logger.debug(f"State: {state}")
logger.debug(f"Code: {code}")
logger.debug(f"Token response: {token_response}")

Session Issues

# Check Redis session data
redis-cli
> KEYS session:*
> HGETALL session:abc123
> TTL session:abc123

Last Updated: 2025-12-12
Version: 2.0.0
Enhanced with security analysis, threat modeling, and OIDC/OAuth 2.0 best practices

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

workflow automation

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

jira orchestration workflow

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

pr-workflow

No summary provided by upstream source.

Repository SourceNeeds Review
General

vision-multimodal

No summary provided by upstream source.

Repository SourceNeeds Review