Authentication works initially but fails after some time
Inconsistent authentication errors
Solutions:
Check token expiration:
import jwtfrom datetime import datetime# Decode token without verification to check expirationtry: payload = jwt.decode(token, options={"verify_signature": False}) exp = payload.get("exp", 0) current_time = datetime.now().timestamp() if current_time > exp: print(f"Token expired {(current_time - exp) / 60:.1f} minutes ago") else: print(f"Token valid for {(exp - current_time) / 60:.1f} more minutes")except Exception as e: print(f"Failed to decode token: {e}")
Implement proper token refresh:
# Definable doesn't use refresh tokens by default # You'll need to re-login when tokens expire# Default expiration time is set in settings.jwt_expire_minutes# For client-side expiration check:async def should_refresh_auth(): current_time = datetime.now().timestamp() # Get token expiration from decoded payload if token_exp - current_time < 300: # 5 minutes buffer # Redirect to login redirect_to_login() return True return False
Verify token format:
# Definable uses HS256 algorithm for JWT# Tokens should have header.payload.signature formatdef is_valid_jwt_format(token): parts = token.split(".") if len(parts) != 3: return False try: # Check if each part is valid base64 for part in parts[0:2]: # Header and payload should be valid base64 padding = "=" * (4 - (len(part) % 4)) base64.b64decode(part + padding) return True except Exception: return False
Check JWT secret in environment:
Verify that JWT_SECRET environment variable is properly set
Ensure the same secret is used across all services/instances
Verify that it matches the value in your .env file
# Check JWT secret in environment variablesecho $JWT_SECRET# Verify it matches what's in .env filegrep JWT_SECRET .env
Verify client settings for JWT expiration:
Definableβs JWT expiration is configured in the settings.py file
Error: βInvalid authorizationβ or βMissing authentication credentialsβ
Authentication works in some tools (e.g., Postman) but not in your code
Solutions:
Check header format against JWTBearer implementation:
# Correct format for Definable's JWTBearer middlewareheaders = { "Authorization": f"Bearer {token}" # Note the space after "Bearer"}# Incorrect formats that will fail with Definable's JWTBearer:# headers = {"Authorization": token} # Missing 'Bearer' prefix# headers = {"Authorization": f"bearer {token}"} # Case sensitive, must be 'Bearer'# headers = {"Authorization": f"Bearer{token}"} # Missing space after 'Bearer'
Verify token inclusion in all requests:
# Using a request interceptor (axios example)axios.interceptors.request.use( config => { const token = localStorage.getItem('token'); if (token) { config.headers.Authorization = `Bearer ${token}`; } return config; }, error => Promise.reject(error));
Check request object in JWTBearer middleware:
Definableβs JWTBearer middleware extracts token from either HTTP Request or WebSocket
For HTTP, it checks credentials.scheme == "Bearer"
For WebSocket, it checks websocket.query_params.get("token")
# For debugging JWT extraction in your middleware:def debug_token_extraction(request=None, websocket=None): if request: auth_header = request.headers.get("Authorization", "") print(f"Auth header: {auth_header}") if auth_header.startswith("Bearer "): token = auth_header[7:] # Remove 'Bearer ' prefix print(f"Extracted token: {token[:10]}...") else: print("Invalid auth header format") elif websocket: token = websocket.query_params.get("token") print(f"WebSocket token query param: {token[:10] if token else 'None'}...") else: print("No request or websocket provided")
Check for header modifications in proxies:
Some proxies may strip or modify authentication headers
Verify headers reach the API intact using debugging tools
# Definable JWT payload typically contains:# - id: The user's UUID# - exp: Expiration timestamp# When accessed with RBAC middleware, additional claims are added:# - org_id: Current organization context# - required_permission: The permission used for the request# - role: User's role name# - role_level: User's role hierarchy level# - role_id: User's role UUID# - permissions: List of permissions the user has# Decode token to check contents:import jwtdef inspect_token(token): try: # Decode without verification for inspection payload = jwt.decode(token, options={"verify_signature": False}) print(f"User ID: {payload.get('id')}") print(f"Expiration: {datetime.fromtimestamp(payload.get('exp')).isoformat()}") print(f"Organization ID: {payload.get('org_id', 'Not present')}") print(f"Role: {payload.get('role', 'Not present')}") return payload except Exception as e: print(f"Failed to decode token: {e}") return None
Implement client-side security measures:
// Since Definable doesn't implement token revocation by default,// implement these client-side security measures:// 1. Store token securelyfunction securelyStoreToken(token) { // Use HttpOnly cookies in production if possible // For SPAs, localStorage is often used despite security concerns localStorage.setItem('access_token', token); // Also store token issue time for tracking localStorage.setItem('token_issued_at', Date.now().toString());}// 2. Implement logout across tabswindow.addEventListener('storage', (event) => { if (event.key === 'access_token' && !event.newValue) { // Token was removed in another tab, logout in this tab too window.location.href = '/login'; }});// 3. Clear token on suspicious activityfunction detectSuspiciousActivity() { const issuedAt = parseInt(localStorage.getItem('token_issued_at') || '0'); const maxSessionTime = 12 * 60 * 60 * 1000; // 12 hours if (Date.now() - issuedAt > maxSessionTime) { // Session too long, force re-auth logout(); }}
Configure JWT validity period:
Definable uses settings.jwt_expire_minutes to control token lifetime
Check your .env file and update JWT_EXPIRE_MINUTES to an appropriate value:
# Recommended values based on security requirements:# High security: 15-30 minutes# Medium security: 1-2 hours# Low security: 8-24 hoursJWT_EXPIRE_MINUTES=60
Additional security for production:
Definable doesnβt include fingerprinting or IP validation by default
For production, consider implementing an enhanced authentication middleware:
# Example of enhanced JWT validation middlewareasync def enhanced_jwt_validation( request: Request, token_payload: dict = Depends(JWTBearer()), session: AsyncSession = Depends(get_db)): user_id = token_payload.get("id") # Check if user is still active user = await session.get(UserModel, UUID(user_id)) if not user or not user.is_active: raise HTTPException(status_code=403, detail="User account is deactivated") # Add last activity tracking user.last_active = datetime.now() session.add(user) await session.commit() return token_payload
Error: βAccess denied. Required: :β with status code 403
User can access some endpoints but not others
Permissions work for some users but not others
Solutions:
Verify user roles in organization context:
# Definable's RBAC implementation is organization-specific# Check current user's role in the organization:response = requests.get( f"{API_URL}/roles/list_roles?org_id={org_id}", headers={"Authorization": f"Bearer {token}"})if response.status_code == 200: roles = response.json() print(f"Available roles: {[role['name'] for role in roles]}")# Get user's permissionsresponse = requests.get( f"{API_URL}/users/me?org_id={org_id}", headers={"Authorization": f"Bearer {token}"})if response.status_code == 200: user_data = response.json() role = user_data.get("role") print(f"User role: {role}")
Understand Definableβs RBAC middleware:
Definableβs RBAC middleware checks for specific resource and action permissions
Routes are protected with: user: dict = Depends(RBAC("resource", "action"))
Permissions follow resource:action pattern, with wildcard support (*)
# Example of Definable's RBAC middleware usage@app.get("/api/protected")async def protected_route(user: dict = Depends(RBAC("kb", "read"))): # If execution reaches here, the user is authorized return {"message": "Access granted", "user_id": user["id"]}# For wildcards, Definable supports patterns like:# - "*" (any resource and any action)# - "kb:*" (any action on kb resource)# - "*:read" (read action on any resource)# - "kb:r*" (any action starting with "r" on kb resource)
If user is not active in the organization, theyβll get 403 error
# Check user's status in organizationmember_query = select(OrganizationMemberModel).where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id ))members = await session.execute(member_query)member = members.unique().scalar_one_or_none()if not member: print("User is not a member of this organization")elif member.status != "active": print(f"User's status in organization is: {member.status}")else: print("User is an active member of this organization")
Add missing permissions or update role:
# Assign a role to a user (admin operation)response = requests.post( f"{API_URL}/roles/{org_id}/users/{user_id}/roles", headers={"Authorization": f"Bearer {admin_token}"}, json={"role_id": "editor_role_id"})# Create a new permissionresponse = requests.post( f"{API_URL}/roles/permission", headers={"Authorization": f"Bearer {admin_token}"}, json={ "resource": "kb", "action": "read", "description": "Read knowledge base" })
Debug Definableβs RBAC wildcard matching:
# Definable uses a custom wildcard matcher in the RBAC class# You can recreate it for testing:def check_wildcard_match(permission_value: str, required_value: str) -> bool: """Check if permission matches required value with wildcard support.""" if permission_value == "*": return True if "*" not in permission_value: return permission_value == required_value # Handle pattern matching with wildcards pattern_parts = permission_value.split("*") value = required_value # Check prefix if pattern_parts[0] and not required_value.startswith(pattern_parts[0]): return False # Check suffix if pattern_parts[-1] and not required_value.endswith(pattern_parts[-1]): return False # Check middle parts for part in pattern_parts[1:-1]: if part not in required_value: return False # Move past the matched part for next check value = value[value.find(part) + len(part):] return True# Test it:print(check_wildcard_match("kb:*", "kb:read")) # Trueprint(check_wildcard_match("*:read", "kb:read")) # Trueprint(check_wildcard_match("kb:r*", "kb:read")) # Trueprint(check_wildcard_match("kb:w*", "kb:read")) # False
Role assignment issues
Symptoms:
User doesnβt see expected role in their profile
Role assignments donβt persist
User loses access after logging out and back in
Solutions:
Verify role was correctly assigned in Definableβs database:
# Definable stores role assignments in OrganizationMemberModelmember_query = select(OrganizationMemberModel).where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id ))# Check user's current role in organizationresponse = requests.get( f"{API_URL}/organizations/{org_id}/users/{user_id}", headers={"Authorization": f"Bearer {admin_token}"})if response.status_code == 200: user_data = response.json() print(f"User role in this organization: {user_data.get('role', {}).get('name')}")
Check role existence and permissions:
# Get role definition using Definable's roles serviceresponse = requests.get( f"{API_URL}/roles/{role_id}?org_id={org_id}", headers={"Authorization": f"Bearer {admin_token}"})if response.status_code == 200: role_data = response.json() print(f"Role name: {role_data.get('name')}") print(f"Role hierarchy level: {role_data.get('hierarchy_level')}") print(f"Role is system role: {role_data.get('is_system_role')}") print(f"Role permissions: {role_data.get('permissions', [])}")
Check for hierarchy level conflicts:
# Definable uses hierarchy_level to determine role precedence# Higher numbers = higher privilege# RoleService._validate_hierarchy_level ensures hierarchy levels are unique# Get all roles in the organization to check hierarchyresponse = requests.get( f"{API_URL}/roles/list_roles?org_id={org_id}", headers={"Authorization": f"Bearer {admin_token}"})if response.status_code == 200: roles = response.json() # Sort by hierarchy level sorted_roles = sorted(roles, key=lambda r: r.get('hierarchy_level', 0), reverse=True) for role in sorted_roles: print(f"Role: {role.get('name')}, Level: {role.get('hierarchy_level')}")
Recreate role assignment:
# To resolve persistent issues, recreate the role assignment# 1. First get the user's current organization member recordmember_query = select(OrganizationMemberModel).where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id ))# 2. Update the role_id # Using direct SQL (example)update_query = ( update(OrganizationMemberModel) .where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id ) ) .values(role_id=new_role_id, status="active"))# 3. Verify user needs to get a new token after role changesprint("Role updated. User must log out and log in again for changes to take effect.")
Permission inheritance and organization context
Symptoms:
User with admin role cannot perform expected actions
# Definable's RBAC is organization-scoped# The org_id query parameter is required for most endpoints# Correct request with org_id:response = requests.get( f"{API_URL}/some-endpoint?org_id={org_id}", headers={"Authorization": f"Bearer {token}"})# For WebSockets:websocket = WebSocket(f"wss://{API_URL}/ws/connect?token={token}&org_id={org_id}")# RBAC middleware checks:# 1. User has org_id in request# 2. User is an active member of that organization# 3. User has appropriate permissions in that organization
Debug missing organization context:
# If getting: "Invalid org id" errors:# For HTTP requests, check:if request: org_id = request.query_params.get("org_id") print(f"Request org_id: {org_id}")# For WebSockets, check:elif websocket: org_id = websocket.query_params.get("org_id") print(f"WebSocket org_id: {org_id}")# If org_id is None, you need to add it to your request
Verify permission scope in Definable:
# In Definable's implementation, permissions are tied to:# 1. A user's role in a specific organization# 2. The PermissionModel defines resource & action# 3. RolePermissionModel links roles to permissions# Query to check what permissions are assigned to a role:role_perms_query = ( select(RolePermissionModel, PermissionModel) .join(PermissionModel, RolePermissionModel.permission_id == PermissionModel.id) .where(RolePermissionModel.role_id == role_id))# List a user's permissions in an organization:async def list_user_permissions(user_id, org_id, session): # Get user's role in organization member_query = select(OrganizationMemberModel).where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id, OrganizationMemberModel.status == "active" ) ) member = await session.execute(member_query) member = member.scalar_one_or_none() if not member: return [] # Get permissions for this role role_perms_query = ( select(PermissionModel) .join(RolePermissionModel, PermissionModel.id == RolePermissionModel.permission_id) .where(RolePermissionModel.role_id == member.role_id) ) perms = await session.execute(role_perms_query) return list(perms.scalars().all())
Handle multi-organization scenarios:
// For clients working with multiple organizations:// Store current organization contextfunction setCurrentOrganization(orgId) { localStorage.setItem('current_org_id', orgId);}// Add organization context to all API requestsaxios.interceptors.request.use(config => { const orgId = localStorage.getItem('current_org_id'); if (orgId) { // For GET requests if (!config.params) { config.params = {}; } if (!config.params.org_id) { config.params.org_id = orgId; } // For POST/PUT requests with JSON body if (config.data && typeof config.data === 'object' && !config.data.org_id) { config.data.org_id = orgId; } } return config;});
# Definable uses verify_password from auth_util:def verify_password(plain_password, hashed_password): """Verify a password against a hash.""" return pwd_context.verify(plain_password, hashed_password)# Password hash algorithm is bcrypt used by passlib's CryptContext# If hashing algorithm was changed, older passwords may fail verification
Check for account status issues:
# Definable checks user status during login# Check user status in the database:user_query = select(UserModel).where(UserModel.email == email)user = await session.execute(user_query)user = user.scalar_one_or_none()if not user: print("User not found in database")else: print(f"User status: {user.status}") # Status can be "active", "inactive", "pending", etc. # Only "active" users can log in
Check for email verification requirements:
# Definable may require email verification before login# Check if email verified status:user_query = select(UserModel).where(UserModel.email == email)user = await session.execute(user_query)user = user.scalar_one_or_none()if user and not user.email_verified: print("Email not verified. Verification required before login.") # Resend verification email if needed: response = requests.post( f"{API_URL}/auth/resend-verification", json={"email": email} )
Handle login lockouts:
# Definable's implementation may have rate limiting on login attempts# Check for specific error messages indicating lockout# If using RateLimiter middleware from ratelimit.py:# Typical lockout message: "Rate limit exceeded"# To reset a locked account (admin operation):# 1. Reset the user's rate limit counter in Redis# 2. Or enable a temporary bypass for that user# Example client-side exponential backoff:async function loginWithBackoff(credentials, maxRetries = 5) { let retryCount = 0; let delay = 1000; // Start with 1 second while (retryCount < maxRetries) { try { const response = await fetch(`${API_URL}/auth/login`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(credentials) }); if (response.ok) { return await response.json(); } const error = await response.json(); if (error.detail.includes("Rate limit exceeded")) { console.log(`Rate limited. Waiting ${delay/1000} seconds before retry.`); await new Promise(resolve => setTimeout(resolve, delay)); delay *= 2; // Exponential backoff retryCount++; } else { // Other error, not rate limiting throw new Error(error.detail); } } catch (err) { console.error("Login error:", err); throw err; } } throw new Error("Max retries exceeded for login");}
# Definable validates invite tokens by:# 1. Checking if invite exists in the database# 2. Verifying it's not expired# 3. Ensuring it's not already used# Common issues:# - Token expired: invites have expiration timestamps# - Token already used: invite can be used only once# - Email mismatch: token is tied to a specific email# Check invitation in database:invite_query = select(InvitationModel).where(InvitationModel.token == token)invite = await session.execute(invite_query)invite = invite.scalar_one_or_none()if not invite: print("Invitation not found")elif invite.used: print("Invitation already used")elif invite.expires_at < datetime.utcnow(): print("Invitation expired")else: print("Invitation valid")