JWT Token Issues
Token validation failures
Token validation failures
Symptoms:
- Error: βInvalid or expired tokenβ
- Authentication works initially but fails after some time
- Inconsistent authentication errors
-
Check token expiration:
Copy
import jwt from datetime import datetime # Decode token without verification to check expiration try: payload = jwt.decode(token, options={"verify_signature": False}) exp = payload.get("exp", 0) current_time = datetime.now().timestamp() if current_time > exp: print(f"Token expired {(current_time - exp) / 60:.1f} minutes ago") else: print(f"Token valid for {(exp - current_time) / 60:.1f} more minutes") except Exception as e: print(f"Failed to decode token: {e}")
-
Implement proper token refresh:
Copy
# Definable doesn't use refresh tokens by default # You'll need to re-login when tokens expire # Default expiration time is set in settings.jwt_expire_minutes # For client-side expiration check: async def should_refresh_auth(): current_time = datetime.now().timestamp() # Get token expiration from decoded payload if token_exp - current_time < 300: # 5 minutes buffer # Redirect to login redirect_to_login() return True return False
-
Verify token format:
Copy
# Definable uses HS256 algorithm for JWT # Tokens should have header.payload.signature format def is_valid_jwt_format(token): parts = token.split(".") if len(parts) != 3: return False try: # Check if each part is valid base64 for part in parts[0:2]: # Header and payload should be valid base64 padding = "=" * (4 - (len(part) % 4)) base64.b64decode(part + padding) return True except Exception: return False
-
Check JWT secret in environment:
- Verify that
JWT_SECRET
environment variable is properly set - Ensure the same secret is used across all services/instances
- Verify that it matches the value in your
.env
file
Copy# Check JWT secret in environment variables echo $JWT_SECRET # Verify it matches what's in .env file grep JWT_SECRET .env
- Verify that
-
Verify client settings for JWT expiration:
- Definableβs JWT expiration is configured in the
settings.py
file - Default setting is
jwt_expire_minutes
- The token creation uses this setting:
Copy
# From utils/auth_util.py def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(minutes=15) # Default fallback to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.jwt_secret, algorithm="HS256") return encoded_jwt
- Definableβs JWT expiration is configured in the
Authorization header problems
Authorization header problems
Symptoms:
- Error: βInvalid authorizationβ or βMissing authentication credentialsβ
- Authentication works in some tools (e.g., Postman) but not in your code
-
Check header format against JWTBearer implementation:
Copy
# Correct format for Definable's JWTBearer middleware headers = { "Authorization": f"Bearer {token}" # Note the space after "Bearer" } # Incorrect formats that will fail with Definable's JWTBearer: # headers = {"Authorization": token} # Missing 'Bearer' prefix # headers = {"Authorization": f"bearer {token}"} # Case sensitive, must be 'Bearer' # headers = {"Authorization": f"Bearer{token}"} # Missing space after 'Bearer'
-
Verify token inclusion in all requests:
Copy
# Using a request interceptor (axios example) axios.interceptors.request.use( config => { const token = localStorage.getItem('token'); if (token) { config.headers.Authorization = `Bearer ${token}`; } return config; }, error => Promise.reject(error) );
-
Check request object in JWTBearer middleware:
- Definableβs JWTBearer middleware extracts token from either HTTP Request or WebSocket
- For HTTP, it checks
credentials.scheme == "Bearer"
- For WebSocket, it checks
websocket.query_params.get("token")
Copy# For debugging JWT extraction in your middleware: def debug_token_extraction(request=None, websocket=None): if request: auth_header = request.headers.get("Authorization", "") print(f"Auth header: {auth_header}") if auth_header.startswith("Bearer "): token = auth_header[7:] # Remove 'Bearer ' prefix print(f"Extracted token: {token[:10]}...") else: print("Invalid auth header format") elif websocket: token = websocket.query_params.get("token") print(f"WebSocket token query param: {token[:10] if token else 'None'}...") else: print("No request or websocket provided")
-
Check for header modifications in proxies:
- Some proxies may strip or modify authentication headers
- Verify headers reach the API intact using debugging tools
Copycurl -v -H "Authorization: Bearer your-token" https://api.example.com/endpoint
Token security with Definable's implementation
Token security with Definable's implementation
Symptoms:
- Unexpected logins from unknown locations
- Security alerts about unauthorized access
- Token becoming invalid unexpectedly
-
Understand Definableβs token content:
Copy
# Definable JWT payload typically contains: # - id: The user's UUID # - exp: Expiration timestamp # When accessed with RBAC middleware, additional claims are added: # - org_id: Current organization context # - required_permission: The permission used for the request # - role: User's role name # - role_level: User's role hierarchy level # - role_id: User's role UUID # - permissions: List of permissions the user has # Decode token to check contents: import jwt def inspect_token(token): try: # Decode without verification for inspection payload = jwt.decode(token, options={"verify_signature": False}) print(f"User ID: {payload.get('id')}") print(f"Expiration: {datetime.fromtimestamp(payload.get('exp')).isoformat()}") print(f"Organization ID: {payload.get('org_id', 'Not present')}") print(f"Role: {payload.get('role', 'Not present')}") return payload except Exception as e: print(f"Failed to decode token: {e}") return None
-
Implement client-side security measures:
Copy
// Since Definable doesn't implement token revocation by default, // implement these client-side security measures: // 1. Store token securely function securelyStoreToken(token) { // Use HttpOnly cookies in production if possible // For SPAs, localStorage is often used despite security concerns localStorage.setItem('access_token', token); // Also store token issue time for tracking localStorage.setItem('token_issued_at', Date.now().toString()); } // 2. Implement logout across tabs window.addEventListener('storage', (event) => { if (event.key === 'access_token' && !event.newValue) { // Token was removed in another tab, logout in this tab too window.location.href = '/login'; } }); // 3. Clear token on suspicious activity function detectSuspiciousActivity() { const issuedAt = parseInt(localStorage.getItem('token_issued_at') || '0'); const maxSessionTime = 12 * 60 * 60 * 1000; // 12 hours if (Date.now() - issuedAt > maxSessionTime) { // Session too long, force re-auth logout(); } }
-
Configure JWT validity period:
- Definable uses
settings.jwt_expire_minutes
to control token lifetime - Check your
.env
file and updateJWT_EXPIRE_MINUTES
to an appropriate value:
Copy# Recommended values based on security requirements: # High security: 15-30 minutes # Medium security: 1-2 hours # Low security: 8-24 hours JWT_EXPIRE_MINUTES=60
- Definable uses
-
Additional security for production:
- Definable doesnβt include fingerprinting or IP validation by default
- For production, consider implementing an enhanced authentication middleware:
Copy# Example of enhanced JWT validation middleware async def enhanced_jwt_validation( request: Request, token_payload: dict = Depends(JWTBearer()), session: AsyncSession = Depends(get_db) ): user_id = token_payload.get("id") # Check if user is still active user = await session.get(UserModel, UUID(user_id)) if not user or not user.is_active: raise HTTPException(status_code=403, detail="User account is deactivated") # Add last activity tracking user.last_active = datetime.now() session.add(user) await session.commit() return token_payload
RBAC Permission Issues
Insufficient permissions
Insufficient permissions
Symptoms:
- Error: βAccess denied. Required: :β with status code 403
- User can access some endpoints but not others
- Permissions work for some users but not others
-
Verify user roles in organization context:
Copy
# Definable's RBAC implementation is organization-specific # Check current user's role in the organization: response = requests.get( f"{API_URL}/roles/list_roles?org_id={org_id}", headers={"Authorization": f"Bearer {token}"} ) if response.status_code == 200: roles = response.json() print(f"Available roles: {[role['name'] for role in roles]}") # Get user's permissions response = requests.get( f"{API_URL}/users/me?org_id={org_id}", headers={"Authorization": f"Bearer {token}"} ) if response.status_code == 200: user_data = response.json() role = user_data.get("role") print(f"User role: {role}")
-
Understand Definableβs RBAC middleware:
- Definableβs RBAC middleware checks for specific resource and action permissions
- Routes are protected with:
user: dict = Depends(RBAC("resource", "action"))
- Permissions follow
resource:action
pattern, with wildcard support (*
)
Copy# Example of Definable's RBAC middleware usage @app.get("/api/protected") async def protected_route(user: dict = Depends(RBAC("kb", "read"))): # If execution reaches here, the user is authorized return {"message": "Access granted", "user_id": user["id"]} # For wildcards, Definable supports patterns like: # - "*" (any resource and any action) # - "kb:*" (any action on kb resource) # - "*:read" (read action on any resource) # - "kb:r*" (any action starting with "r" on kb resource)
-
Check organization membership status:
- Definableβs RBAC checks
OrganizationMemberModel.status == "active"
- If user is not active in the organization, theyβll get 403 error
Copy# Check user's status in organization member_query = select(OrganizationMemberModel).where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id ) ) members = await session.execute(member_query) member = members.unique().scalar_one_or_none() if not member: print("User is not a member of this organization") elif member.status != "active": print(f"User's status in organization is: {member.status}") else: print("User is an active member of this organization")
- Definableβs RBAC checks
-
Add missing permissions or update role:
Copy
# Assign a role to a user (admin operation) response = requests.post( f"{API_URL}/roles/{org_id}/users/{user_id}/roles", headers={"Authorization": f"Bearer {admin_token}"}, json={"role_id": "editor_role_id"} ) # Create a new permission response = requests.post( f"{API_URL}/roles/permission", headers={"Authorization": f"Bearer {admin_token}"}, json={ "resource": "kb", "action": "read", "description": "Read knowledge base" } )
-
Debug Definableβs RBAC wildcard matching:
Copy
# Definable uses a custom wildcard matcher in the RBAC class # You can recreate it for testing: def check_wildcard_match(permission_value: str, required_value: str) -> bool: """Check if permission matches required value with wildcard support.""" if permission_value == "*": return True if "*" not in permission_value: return permission_value == required_value # Handle pattern matching with wildcards pattern_parts = permission_value.split("*") value = required_value # Check prefix if pattern_parts[0] and not required_value.startswith(pattern_parts[0]): return False # Check suffix if pattern_parts[-1] and not required_value.endswith(pattern_parts[-1]): return False # Check middle parts for part in pattern_parts[1:-1]: if part not in required_value: return False # Move past the matched part for next check value = value[value.find(part) + len(part):] return True # Test it: print(check_wildcard_match("kb:*", "kb:read")) # True print(check_wildcard_match("*:read", "kb:read")) # True print(check_wildcard_match("kb:r*", "kb:read")) # True print(check_wildcard_match("kb:w*", "kb:read")) # False
Role assignment issues
Role assignment issues
Symptoms:
- User doesnβt see expected role in their profile
- Role assignments donβt persist
- User loses access after logging out and back in
-
Verify role was correctly assigned in Definableβs database:
Copy
# Definable stores role assignments in OrganizationMemberModel member_query = select(OrganizationMemberModel).where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id ) ) # Check user's current role in organization response = requests.get( f"{API_URL}/organizations/{org_id}/users/{user_id}", headers={"Authorization": f"Bearer {admin_token}"} ) if response.status_code == 200: user_data = response.json() print(f"User role in this organization: {user_data.get('role', {}).get('name')}")
-
Check role existence and permissions:
Copy
# Get role definition using Definable's roles service response = requests.get( f"{API_URL}/roles/{role_id}?org_id={org_id}", headers={"Authorization": f"Bearer {admin_token}"} ) if response.status_code == 200: role_data = response.json() print(f"Role name: {role_data.get('name')}") print(f"Role hierarchy level: {role_data.get('hierarchy_level')}") print(f"Role is system role: {role_data.get('is_system_role')}") print(f"Role permissions: {role_data.get('permissions', [])}")
-
Check for hierarchy level conflicts:
Copy
# Definable uses hierarchy_level to determine role precedence # Higher numbers = higher privilege # RoleService._validate_hierarchy_level ensures hierarchy levels are unique # Get all roles in the organization to check hierarchy response = requests.get( f"{API_URL}/roles/list_roles?org_id={org_id}", headers={"Authorization": f"Bearer {admin_token}"} ) if response.status_code == 200: roles = response.json() # Sort by hierarchy level sorted_roles = sorted(roles, key=lambda r: r.get('hierarchy_level', 0), reverse=True) for role in sorted_roles: print(f"Role: {role.get('name')}, Level: {role.get('hierarchy_level')}")
-
Recreate role assignment:
Copy
# To resolve persistent issues, recreate the role assignment # 1. First get the user's current organization member record member_query = select(OrganizationMemberModel).where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id ) ) # 2. Update the role_id # Using direct SQL (example) update_query = ( update(OrganizationMemberModel) .where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id ) ) .values(role_id=new_role_id, status="active") ) # 3. Verify user needs to get a new token after role changes print("Role updated. User must log out and log in again for changes to take effect.")
Permission inheritance and organization context
Permission inheritance and organization context
Symptoms:
- User with admin role cannot perform expected actions
- Permissions donβt apply across organizations
- Required
org_id
parameter missing errors
-
Understand Definableβs organization-based permissions:
Copy
# Definable's RBAC is organization-scoped # The org_id query parameter is required for most endpoints # Correct request with org_id: response = requests.get( f"{API_URL}/some-endpoint?org_id={org_id}", headers={"Authorization": f"Bearer {token}"} ) # For WebSockets: websocket = WebSocket(f"wss://{API_URL}/ws/connect?token={token}&org_id={org_id}") # RBAC middleware checks: # 1. User has org_id in request # 2. User is an active member of that organization # 3. User has appropriate permissions in that organization
-
Debug missing organization context:
Copy
# If getting: "Invalid org id" errors: # For HTTP requests, check: if request: org_id = request.query_params.get("org_id") print(f"Request org_id: {org_id}") # For WebSockets, check: elif websocket: org_id = websocket.query_params.get("org_id") print(f"WebSocket org_id: {org_id}") # If org_id is None, you need to add it to your request
-
Verify permission scope in Definable:
Copy
# In Definable's implementation, permissions are tied to: # 1. A user's role in a specific organization # 2. The PermissionModel defines resource & action # 3. RolePermissionModel links roles to permissions # Query to check what permissions are assigned to a role: role_perms_query = ( select(RolePermissionModel, PermissionModel) .join(PermissionModel, RolePermissionModel.permission_id == PermissionModel.id) .where(RolePermissionModel.role_id == role_id) ) # List a user's permissions in an organization: async def list_user_permissions(user_id, org_id, session): # Get user's role in organization member_query = select(OrganizationMemberModel).where( and_( OrganizationMemberModel.user_id == user_id, OrganizationMemberModel.organization_id == org_id, OrganizationMemberModel.status == "active" ) ) member = await session.execute(member_query) member = member.scalar_one_or_none() if not member: return [] # Get permissions for this role role_perms_query = ( select(PermissionModel) .join(RolePermissionModel, PermissionModel.id == RolePermissionModel.permission_id) .where(RolePermissionModel.role_id == member.role_id) ) perms = await session.execute(role_perms_query) return list(perms.scalars().all())
-
Handle multi-organization scenarios:
Copy
// For clients working with multiple organizations: // Store current organization context function setCurrentOrganization(orgId) { localStorage.setItem('current_org_id', orgId); } // Add organization context to all API requests axios.interceptors.request.use(config => { const orgId = localStorage.getItem('current_org_id'); if (orgId) { // For GET requests if (!config.params) { config.params = {}; } if (!config.params.org_id) { config.params.org_id = orgId; } // For POST/PUT requests with JSON body if (config.data && typeof config.data === 'object' && !config.data.org_id) { config.data.org_id = orgId; } } return config; });
Login and Account Issues
Login failures
Login failures
Symptoms:
- Unable to log in with valid credentials
- Error message: βIncorrect username or passwordβ
- Persistent login failures
-
Verify credentials against Definableβs auth service:
Copy
# Definable's AuthService.post_login uses verify_password to check credentials # Example login request: response = requests.post( f"{API_URL}/auth/login", json={ "email": "user@example.com", "password": "password123" } ) if response.status_code != 200: error_data = response.json() print(f"Login error: {error_data.get('detail')}") else: token_data = response.json() print(f"Login successful. Token obtained: {token_data.get('access_token')[:10]}...")
-
Check password verification in Definable:
Copy
# Definable uses verify_password from auth_util: def verify_password(plain_password, hashed_password): """Verify a password against a hash.""" return pwd_context.verify(plain_password, hashed_password) # Password hash algorithm is bcrypt used by passlib's CryptContext # If hashing algorithm was changed, older passwords may fail verification
-
Check for account status issues:
Copy
# Definable checks user status during login # Check user status in the database: user_query = select(UserModel).where(UserModel.email == email) user = await session.execute(user_query) user = user.scalar_one_or_none() if not user: print("User not found in database") else: print(f"User status: {user.status}") # Status can be "active", "inactive", "pending", etc. # Only "active" users can log in
-
Check for email verification requirements:
Copy
# Definable may require email verification before login # Check if email verified status: user_query = select(UserModel).where(UserModel.email == email) user = await session.execute(user_query) user = user.scalar_one_or_none() if user and not user.email_verified: print("Email not verified. Verification required before login.") # Resend verification email if needed: response = requests.post( f"{API_URL}/auth/resend-verification", json={"email": email} )
-
Handle login lockouts:
Copy
# Definable's implementation may have rate limiting on login attempts # Check for specific error messages indicating lockout # If using RateLimiter middleware from ratelimit.py: # Typical lockout message: "Rate limit exceeded" # To reset a locked account (admin operation): # 1. Reset the user's rate limit counter in Redis # 2. Or enable a temporary bypass for that user # Example client-side exponential backoff: async function loginWithBackoff(credentials, maxRetries = 5) { let retryCount = 0; let delay = 1000; // Start with 1 second while (retryCount < maxRetries) { try { const response = await fetch(`${API_URL}/auth/login`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(credentials) }); if (response.ok) { return await response.json(); } const error = await response.json(); if (error.detail.includes("Rate limit exceeded")) { console.log(`Rate limited. Waiting ${delay/1000} seconds before retry.`); await new Promise(resolve => setTimeout(resolve, delay)); delay *= 2; // Exponential backoff retryCount++; } else { // Other error, not rate limiting throw new Error(error.detail); } } catch (err) { console.error("Login error:", err); throw err; } } throw new Error("Max retries exceeded for login"); }
Invitation and signup issues
Invitation and signup issues
Symptoms:
- Cannot accept invitation
- Error in signup process
- Invitation links expired or invalid
-
Verify invitation token:
Copy
# Definable handles invitations in AuthService.post_signup_invite and get_signup_invite # Check invitation status: response = requests.get( f"{API_URL}/auth/signup-invite?token={invitation_token}" ) if response.status_code != 200: error_data = response.json() print(f"Invitation error: {error_data.get('detail')}") else: invite_data = response.json() print(f"Invitation valid for: {invite_data.get('email')}") print(f"Organization: {invite_data.get('organization', {}).get('name')}")
-
Debug invitation token issues:
Copy
# Definable validates invite tokens by: # 1. Checking if invite exists in the database # 2. Verifying it's not expired # 3. Ensuring it's not already used # Common issues: # - Token expired: invites have expiration timestamps # - Token already used: invite can be used only once # - Email mismatch: token is tied to a specific email # Check invitation in database: invite_query = select(InvitationModel).where(InvitationModel.token == token) invite = await session.execute(invite_query) invite = invite.scalar_one_or_none() if not invite: print("Invitation not found") elif invite.used: print("Invitation already used") elif invite.expires_at < datetime.utcnow(): print("Invitation expired") else: print("Invitation valid")
-
Resend invitation if needed:
Copy
# Admin can resend invitation response = requests.post( f"{API_URL}/auth/invite", headers={"Authorization": f"Bearer {admin_token}"}, json={ "email": "user@example.com", "org_id": organization_id, "role_id": role_id } ) if response.status_code == 200: print("New invitation sent successfully")
-
Complete signup with invitation:
Copy
# Complete signup using invitation response = requests.post( f"{API_URL}/auth/signup-invite", json={ "email": "user@example.com", "password": "securePassword123", "first_name": "John", "last_name": "Doe", "token": invitation_token } ) if response.status_code == 201: print("Signup with invitation successful") user_data = response.json() print(f"User created with ID: {user_data.get('id')}") else: error_data = response.json() print(f"Signup error: {error_data.get('detail')}")
Account management issues
Account management issues
Symptoms:
- Cannot update profile information
- Password reset fails
- Email verification issues
-
Debug password reset flow:
Copy
# Definable's password reset flow: # 1. Request password reset token response = requests.post( f"{API_URL}/auth/forgot-password", json={"email": "user@example.com"} ) if response.status_code == 200: print("Password reset email sent") # 2. Reset password with token response = requests.post( f"{API_URL}/auth/reset-password", json={ "token": "reset_token_from_email", "password": "newSecurePassword123" } ) if response.status_code == 200: print("Password reset successful") else: error_data = response.json() print(f"Password reset error: {error_data.get('detail')}")
-
Check email verification status:
Copy
# Definable tracks email verification status # Get user profile to check verification status response = requests.get( f"{API_URL}/auth/me", headers={"Authorization": f"Bearer {token}"} ) if response.status_code == 200: user_data = response.json() print(f"Email verified: {user_data.get('email_verified', False)}") # Verify email with token: response = requests.post( f"{API_URL}/auth/verify-email", json={"token": "verification_token_from_email"} ) if response.status_code == 200: print("Email verification successful")
-
Update user profile:
Copy
# Update user profile in Definable response = requests.put( f"{API_URL}/auth/me", headers={"Authorization": f"Bearer {token}"}, json={ "first_name": "John", "last_name": "Smith", # Other fields as needed } ) if response.status_code == 200: print("Profile updated successfully") else: error_data = response.json() print(f"Profile update error: {error_data.get('detail')}")
-
Handle account deletion:
Copy
# Definable process for account deactivation # Deactivate (soft delete) account: response = requests.delete( f"{API_URL}/auth/me", headers={"Authorization": f"Bearer {token}"} ) if response.status_code == 200: print("Account deactivated successfully") # Administrator hard delete (permanent) response = requests.delete( f"{API_URL}/users/{user_id}", headers={"Authorization": f"Bearer {admin_token}"} ) if response.status_code == 204: print("User permanently deleted")
WebSocket Authentication Issues
WebSocket connection authentication failures
WebSocket connection authentication failures
Symptoms:
- WebSocket connection is rejected
- Error: βNot authenticatedβ or βInvalid tokenβ
- Connection drops immediately after establishing
-
Verify WebSocket connection with token:
Copy
// Definable's WebSocket authentication requires token in query parameters // Correct format: const socket = new WebSocket(`wss://${API_URL}/ws/connect?token=${jwt_token}&org_id=${org_id}`); socket.onopen = () => { console.log("WebSocket connection established"); }; socket.onerror = (error) => { console.error("WebSocket error:", error); }; socket.onclose = (event) => { console.log(`WebSocket closed with code: ${event.code}, reason: ${event.reason}`); };
-
Debug WebSocket auth errors:
Copy
# Definable's JWTBearer validates WebSocket connections # For WebSockets, validate_socket_token is called # Server-side debugging: async def authenticate_ws_connection(websocket: WebSocket): # Extract token from query params token = websocket.query_params.get("token") if not token: await websocket.close(code=1008, reason="Missing token") return None # Extract org_id from query params org_id = websocket.query_params.get("org_id") if not org_id: await websocket.close(code=1008, reason="Missing org_id") return None # Validate JWT token try: payload = jwt.decode( token, settings.jwt_secret, algorithms=["HS256"] ) user_id = payload.get("sub") # Further validation as needed return payload except Exception as e: await websocket.close(code=1008, reason=f"Invalid token: {str(e)}") return None
-
Handle WebSocket reconnection with token refresh:
Copy
// Client-side token refresh and reconnection class AuthenticatedWebSocket { constructor(baseUrl, getToken, getOrgId) { this.baseUrl = baseUrl; this.getToken = getToken; this.getOrgId = getOrgId; this.socket = null; this.reconnectAttempts = 0; this.maxReconnectAttempts = 5; this.reconnectDelay = 1000; this.listeners = { message: [], error: [], close: [], open: [] }; } connect() { const token = this.getToken(); const orgId = this.getOrgId(); if (!token) { console.error("No token available for WebSocket connection"); return; } if (!orgId) { console.error("No organization ID available for WebSocket connection"); return; } // Definable's WebSocket endpoint with required auth parameters this.socket = new WebSocket(`${this.baseUrl}/ws/connect?token=${token}&org_id=${orgId}`); this.socket.onopen = (event) => { console.log("WebSocket connected"); this.reconnectAttempts = 0; this.listeners.open.forEach(listener => listener(event)); }; this.socket.onmessage = (event) => { this.listeners.message.forEach(listener => listener(event)); }; this.socket.onerror = (event) => { this.listeners.error.forEach(listener => listener(event)); }; this.socket.onclose = (event) => { this.listeners.close.forEach(listener => listener(event)); // Handle authentication failures (1008 = Policy Violation) if (event.code === 1008 && event.reason.includes("token")) { console.error("WebSocket authentication failed:", event.reason); // Try to refresh token before reconnecting this.refreshTokenAndReconnect(); } else if (this.reconnectAttempts < this.maxReconnectAttempts) { // Handle other disconnects setTimeout(() => this.reconnect(), this.reconnectDelay); } }; } async refreshTokenAndReconnect() { try { // Implement token refresh logic here await refreshAuthToken(); this.reconnect(); } catch (error) { console.error("Failed to refresh token:", error); } } reconnect() { this.reconnectAttempts++; const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); console.log(`Attempting to reconnect in ${delay}ms (attempt ${this.reconnectAttempts})`); setTimeout(() => { this.connect(); }, delay); } // Event listener methods on(event, callback) { if (this.listeners[event]) { this.listeners[event].push(callback); } return this; } close() { if (this.socket) { this.socket.close(); } } send(data) { if (this.socket && this.socket.readyState === WebSocket.OPEN) { this.socket.send(typeof data === 'string' ? data : JSON.stringify(data)); } else { console.error("WebSocket not connected"); } } } // Usage: const ws = new AuthenticatedWebSocket( 'wss://api.definable.ai', () => localStorage.getItem('access_token'), () => localStorage.getItem('current_org_id') ); ws.on('open', () => console.log('Connected!')) .on('message', (event) => console.log('Received:', event.data)) .connect();
-
Check RBAC permissions for WebSocket connections:
Copy
# Definable's WebSocketService uses RBAC for permission checking # WebSocket routes are protected with RBAC dependency # In WebSocketService.ws_connect, RBAC dependency is used: @websocket_endpoint.websocket("/connect") async def ws_connect( websocket: WebSocket, user: dict = Depends(RBAC("ws", "connect")) ): # If execution reaches here, user is authorized # Client debugging: # If WebSocket connection fails with 403, check user permissions response = requests.get( f"{API_URL}/roles/list_permissions?org_id={org_id}", headers={"Authorization": f"Bearer {token}"} ) if response.status_code == 200: permissions = response.json() # Look for "ws:connect" permission has_ws_permission = any( p.get("resource") == "ws" and p.get("action") == "connect" for p in permissions ) print(f"Has WebSocket connect permission: {has_ws_permission}")
Next Steps
If youβve resolved your authentication issues, consider reviewing these related guides:- JWT Authentication - Detailed information about JWT implementation
- RBAC Permissions - How role-based access control works
- API Errors - Common API errors and their solutions
- Invitation Flow - User invitation and onboarding process