Skip to main content
This guide covers common authentication-related issues that may arise when working with the Definable backend.

JWT Token Issues

Symptoms:
  • Error: β€œInvalid or expired token”
  • Authentication works initially but fails after some time
  • Inconsistent authentication errors
Solutions:
  1. Check token expiration:
    import jwt
    from datetime import datetime
    
    # Decode token without verification to check expiration
    try:
        payload = jwt.decode(token, options={"verify_signature": False})
        exp = payload.get("exp", 0)
        current_time = datetime.now().timestamp()
        
        if current_time > exp:
            print(f"Token expired {(current_time - exp) / 60:.1f} minutes ago")
        else:
            print(f"Token valid for {(exp - current_time) / 60:.1f} more minutes")
    except Exception as e:
        print(f"Failed to decode token: {e}")
    
  2. Implement proper token refresh:
    # Definable doesn't use refresh tokens by default 
    # You'll need to re-login when tokens expire
    # Default expiration time is set in settings.jwt_expire_minutes
    
    # For client-side expiration check:
    async def should_refresh_auth():
        current_time = datetime.now().timestamp()
        # Get token expiration from decoded payload
        if token_exp - current_time < 300:  # 5 minutes buffer
            # Redirect to login
            redirect_to_login()
            return True
        return False
    
  3. Verify token format:
    # Definable uses HS256 algorithm for JWT
    # Tokens should have header.payload.signature format
    def is_valid_jwt_format(token):
        parts = token.split(".")
        if len(parts) != 3:
            return False
        try:
            # Check if each part is valid base64
            for part in parts[0:2]:  # Header and payload should be valid base64
                padding = "=" * (4 - (len(part) % 4))
                base64.b64decode(part + padding)
            return True
        except Exception:
            return False
    
  4. Check JWT secret in environment:
    • Verify that JWT_SECRET environment variable is properly set
    • Ensure the same secret is used across all services/instances
    • Verify that it matches the value in your .env file
    # Check JWT secret in environment variables
    echo $JWT_SECRET
    
    # Verify it matches what's in .env file
    grep JWT_SECRET .env
    
  5. Verify client settings for JWT expiration:
    • Definable’s JWT expiration is configured in the settings.py file
    • Default setting is jwt_expire_minutes
    • The token creation uses this setting:
      # From utils/auth_util.py
      def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
          to_encode = data.copy()
          if expires_delta:
              expire = datetime.now(timezone.utc) + expires_delta
          else:
              expire = datetime.now(timezone.utc) + timedelta(minutes=15)  # Default fallback
          to_encode.update({"exp": expire})
          encoded_jwt = jwt.encode(to_encode, settings.jwt_secret, algorithm="HS256")
          return encoded_jwt
      
Symptoms:
  • Error: β€œInvalid authorization” or β€œMissing authentication credentials”
  • Authentication works in some tools (e.g., Postman) but not in your code
Solutions:
  1. Check header format against JWTBearer implementation:
    # Correct format for Definable's JWTBearer middleware
    headers = {
        "Authorization": f"Bearer {token}"  # Note the space after "Bearer"
    }
    
    # Incorrect formats that will fail with Definable's JWTBearer:
    # headers = {"Authorization": token}  # Missing 'Bearer' prefix
    # headers = {"Authorization": f"bearer {token}"}  # Case sensitive, must be 'Bearer'
    # headers = {"Authorization": f"Bearer{token}"}   # Missing space after 'Bearer'
    
  2. Verify token inclusion in all requests:
    # Using a request interceptor (axios example)
    axios.interceptors.request.use(
        config => {
            const token = localStorage.getItem('token');
            if (token) {
                config.headers.Authorization = `Bearer ${token}`;
            }
            return config;
        },
        error => Promise.reject(error)
    );
    
  3. Check request object in JWTBearer middleware:
    • Definable’s JWTBearer middleware extracts token from either HTTP Request or WebSocket
    • For HTTP, it checks credentials.scheme == "Bearer"
    • For WebSocket, it checks websocket.query_params.get("token")
    # For debugging JWT extraction in your middleware:
    def debug_token_extraction(request=None, websocket=None):
        if request:
            auth_header = request.headers.get("Authorization", "")
            print(f"Auth header: {auth_header}")
            if auth_header.startswith("Bearer "):
                token = auth_header[7:]  # Remove 'Bearer ' prefix
                print(f"Extracted token: {token[:10]}...")
            else:
                print("Invalid auth header format")
        elif websocket:
            token = websocket.query_params.get("token")
            print(f"WebSocket token query param: {token[:10] if token else 'None'}...")
        else:
            print("No request or websocket provided")
    
  4. Check for header modifications in proxies:
    • Some proxies may strip or modify authentication headers
    • Verify headers reach the API intact using debugging tools
    curl -v -H "Authorization: Bearer your-token" https://api.example.com/endpoint
    
Symptoms:
  • Unexpected logins from unknown locations
  • Security alerts about unauthorized access
  • Token becoming invalid unexpectedly
Solutions:
  1. Understand Definable’s token content:
    # Definable JWT payload typically contains:
    # - id: The user's UUID
    # - exp: Expiration timestamp
    
    # When accessed with RBAC middleware, additional claims are added:
    # - org_id: Current organization context
    # - required_permission: The permission used for the request
    # - role: User's role name
    # - role_level: User's role hierarchy level
    # - role_id: User's role UUID
    # - permissions: List of permissions the user has
    
    # Decode token to check contents:
    import jwt
    
    def inspect_token(token):
        try:
            # Decode without verification for inspection
            payload = jwt.decode(token, options={"verify_signature": False})
            print(f"User ID: {payload.get('id')}")
            print(f"Expiration: {datetime.fromtimestamp(payload.get('exp')).isoformat()}")
            print(f"Organization ID: {payload.get('org_id', 'Not present')}")
            print(f"Role: {payload.get('role', 'Not present')}")
            return payload
        except Exception as e:
            print(f"Failed to decode token: {e}")
            return None
    
  2. Implement client-side security measures:
    // Since Definable doesn't implement token revocation by default,
    // implement these client-side security measures:
    
    // 1. Store token securely
    function securelyStoreToken(token) {
      // Use HttpOnly cookies in production if possible
      // For SPAs, localStorage is often used despite security concerns
      localStorage.setItem('access_token', token);
      
      // Also store token issue time for tracking
      localStorage.setItem('token_issued_at', Date.now().toString());
    }
    
    // 2. Implement logout across tabs
    window.addEventListener('storage', (event) => {
      if (event.key === 'access_token' && !event.newValue) {
        // Token was removed in another tab, logout in this tab too
        window.location.href = '/login';
      }
    });
    
    // 3. Clear token on suspicious activity
    function detectSuspiciousActivity() {
      const issuedAt = parseInt(localStorage.getItem('token_issued_at') || '0');
      const maxSessionTime = 12 * 60 * 60 * 1000; // 12 hours
      
      if (Date.now() - issuedAt > maxSessionTime) {
        // Session too long, force re-auth
        logout();
      }
    }
    
  3. Configure JWT validity period:
    • Definable uses settings.jwt_expire_minutes to control token lifetime
    • Check your .env file and update JWT_EXPIRE_MINUTES to an appropriate value:
    # Recommended values based on security requirements:
    # High security: 15-30 minutes
    # Medium security: 1-2 hours
    # Low security: 8-24 hours
    JWT_EXPIRE_MINUTES=60
    
  4. Additional security for production:
    • Definable doesn’t include fingerprinting or IP validation by default
    • For production, consider implementing an enhanced authentication middleware:
    # Example of enhanced JWT validation middleware
    async def enhanced_jwt_validation(
        request: Request,
        token_payload: dict = Depends(JWTBearer()),
        session: AsyncSession = Depends(get_db)
    ):
        user_id = token_payload.get("id")
        # Check if user is still active
        user = await session.get(UserModel, UUID(user_id))
        if not user or not user.is_active:
            raise HTTPException(status_code=403, detail="User account is deactivated")
        
        # Add last activity tracking
        user.last_active = datetime.now()
        session.add(user)
        await session.commit()
        
        return token_payload
    

RBAC Permission Issues

Symptoms:
  • Error: β€œAccess denied. Required: :” with status code 403
  • User can access some endpoints but not others
  • Permissions work for some users but not others
Solutions:
  1. Verify user roles in organization context:
    # Definable's RBAC implementation is organization-specific
    # Check current user's role in the organization:
    response = requests.get(
        f"{API_URL}/roles/list_roles?org_id={org_id}",
        headers={"Authorization": f"Bearer {token}"}
    )
    
    if response.status_code == 200:
        roles = response.json()
        print(f"Available roles: {[role['name'] for role in roles]}")
        
    # Get user's permissions
    response = requests.get(
        f"{API_URL}/users/me?org_id={org_id}",
        headers={"Authorization": f"Bearer {token}"}
    )
    
    if response.status_code == 200:
        user_data = response.json()
        role = user_data.get("role")
        print(f"User role: {role}")
    
  2. Understand Definable’s RBAC middleware:
    • Definable’s RBAC middleware checks for specific resource and action permissions
    • Routes are protected with: user: dict = Depends(RBAC("resource", "action"))
    • Permissions follow resource:action pattern, with wildcard support (*)
    # Example of Definable's RBAC middleware usage
    @app.get("/api/protected")
    async def protected_route(user: dict = Depends(RBAC("kb", "read"))):
        # If execution reaches here, the user is authorized
        return {"message": "Access granted", "user_id": user["id"]}
    
    # For wildcards, Definable supports patterns like:
    # - "*" (any resource and any action)
    # - "kb:*" (any action on kb resource)
    # - "*:read" (read action on any resource)
    # - "kb:r*" (any action starting with "r" on kb resource)
    
  3. Check organization membership status:
    • Definable’s RBAC checks OrganizationMemberModel.status == "active"
    • If user is not active in the organization, they’ll get 403 error
    # Check user's status in organization
    member_query = select(OrganizationMemberModel).where(
        and_(
            OrganizationMemberModel.user_id == user_id, 
            OrganizationMemberModel.organization_id == org_id
        )
    )
    members = await session.execute(member_query)
    member = members.unique().scalar_one_or_none()
    
    if not member:
        print("User is not a member of this organization")
    elif member.status != "active":
        print(f"User's status in organization is: {member.status}")
    else:
        print("User is an active member of this organization")
    
  4. Add missing permissions or update role:
    # Assign a role to a user (admin operation)
    response = requests.post(
        f"{API_URL}/roles/{org_id}/users/{user_id}/roles",
        headers={"Authorization": f"Bearer {admin_token}"},
        json={"role_id": "editor_role_id"}
    )
    
    # Create a new permission
    response = requests.post(
        f"{API_URL}/roles/permission",
        headers={"Authorization": f"Bearer {admin_token}"},
        json={
            "resource": "kb",
            "action": "read",
            "description": "Read knowledge base"
        }
    )
    
  5. Debug Definable’s RBAC wildcard matching:
    # Definable uses a custom wildcard matcher in the RBAC class
    # You can recreate it for testing:
    def check_wildcard_match(permission_value: str, required_value: str) -> bool:
        """Check if permission matches required value with wildcard support."""
        if permission_value == "*":
            return True
        if "*" not in permission_value:
            return permission_value == required_value
    
        # Handle pattern matching with wildcards
        pattern_parts = permission_value.split("*")
        value = required_value
    
        # Check prefix
        if pattern_parts[0] and not required_value.startswith(pattern_parts[0]):
            return False
    
        # Check suffix
        if pattern_parts[-1] and not required_value.endswith(pattern_parts[-1]):
            return False
    
        # Check middle parts
        for part in pattern_parts[1:-1]:
            if part not in required_value:
                return False
            # Move past the matched part for next check
            value = value[value.find(part) + len(part):]
    
        return True
    
    # Test it:
    print(check_wildcard_match("kb:*", "kb:read"))  # True
    print(check_wildcard_match("*:read", "kb:read"))  # True
    print(check_wildcard_match("kb:r*", "kb:read"))  # True
    print(check_wildcard_match("kb:w*", "kb:read"))  # False
    
Symptoms:
  • User doesn’t see expected role in their profile
  • Role assignments don’t persist
  • User loses access after logging out and back in
Solutions:
  1. Verify role was correctly assigned in Definable’s database:
    # Definable stores role assignments in OrganizationMemberModel
    member_query = select(OrganizationMemberModel).where(
        and_(
            OrganizationMemberModel.user_id == user_id,
            OrganizationMemberModel.organization_id == org_id
        )
    )
    
    # Check user's current role in organization
    response = requests.get(
        f"{API_URL}/organizations/{org_id}/users/{user_id}",
        headers={"Authorization": f"Bearer {admin_token}"}
    )
    
    if response.status_code == 200:
        user_data = response.json()
        print(f"User role in this organization: {user_data.get('role', {}).get('name')}")
    
  2. Check role existence and permissions:
    # Get role definition using Definable's roles service
    response = requests.get(
        f"{API_URL}/roles/{role_id}?org_id={org_id}",
        headers={"Authorization": f"Bearer {admin_token}"}
    )
    
    if response.status_code == 200:
        role_data = response.json()
        print(f"Role name: {role_data.get('name')}")
        print(f"Role hierarchy level: {role_data.get('hierarchy_level')}")
        print(f"Role is system role: {role_data.get('is_system_role')}")
        print(f"Role permissions: {role_data.get('permissions', [])}")
    
  3. Check for hierarchy level conflicts:
    # Definable uses hierarchy_level to determine role precedence
    # Higher numbers = higher privilege
    # RoleService._validate_hierarchy_level ensures hierarchy levels are unique
    
    # Get all roles in the organization to check hierarchy
    response = requests.get(
        f"{API_URL}/roles/list_roles?org_id={org_id}",
        headers={"Authorization": f"Bearer {admin_token}"}
    )
    
    if response.status_code == 200:
        roles = response.json()
        # Sort by hierarchy level
        sorted_roles = sorted(roles, key=lambda r: r.get('hierarchy_level', 0), reverse=True)
        for role in sorted_roles:
            print(f"Role: {role.get('name')}, Level: {role.get('hierarchy_level')}")
    
  4. Recreate role assignment:
    # To resolve persistent issues, recreate the role assignment
    
    # 1. First get the user's current organization member record
    member_query = select(OrganizationMemberModel).where(
        and_(
            OrganizationMemberModel.user_id == user_id,
            OrganizationMemberModel.organization_id == org_id
        )
    )
    
    # 2. Update the role_id 
    # Using direct SQL (example)
    update_query = (
        update(OrganizationMemberModel)
        .where(
            and_(
                OrganizationMemberModel.user_id == user_id,
                OrganizationMemberModel.organization_id == org_id
            )
        )
        .values(role_id=new_role_id, status="active")
    )
    
    # 3. Verify user needs to get a new token after role changes
    print("Role updated. User must log out and log in again for changes to take effect.")
    
Symptoms:
  • User with admin role cannot perform expected actions
  • Permissions don’t apply across organizations
  • Required org_id parameter missing errors
Solutions:
  1. Understand Definable’s organization-based permissions:
    # Definable's RBAC is organization-scoped
    # The org_id query parameter is required for most endpoints
    
    # Correct request with org_id:
    response = requests.get(
        f"{API_URL}/some-endpoint?org_id={org_id}",
        headers={"Authorization": f"Bearer {token}"}
    )
    
    # For WebSockets:
    websocket = WebSocket(f"wss://{API_URL}/ws/connect?token={token}&org_id={org_id}")
    
    # RBAC middleware checks:
    # 1. User has org_id in request
    # 2. User is an active member of that organization
    # 3. User has appropriate permissions in that organization
    
  2. Debug missing organization context:
    # If getting: "Invalid org id" errors:
    
    # For HTTP requests, check:
    if request:
        org_id = request.query_params.get("org_id")
        print(f"Request org_id: {org_id}")
        
    # For WebSockets, check:
    elif websocket:
        org_id = websocket.query_params.get("org_id")
        print(f"WebSocket org_id: {org_id}")
        
    # If org_id is None, you need to add it to your request
    
  3. Verify permission scope in Definable:
    # In Definable's implementation, permissions are tied to:
    # 1. A user's role in a specific organization
    # 2. The PermissionModel defines resource & action
    # 3. RolePermissionModel links roles to permissions
    
    # Query to check what permissions are assigned to a role:
    role_perms_query = (
        select(RolePermissionModel, PermissionModel)
        .join(PermissionModel, RolePermissionModel.permission_id == PermissionModel.id)
        .where(RolePermissionModel.role_id == role_id)
    )
    
    # List a user's permissions in an organization:
    async def list_user_permissions(user_id, org_id, session):
        # Get user's role in organization
        member_query = select(OrganizationMemberModel).where(
            and_(
                OrganizationMemberModel.user_id == user_id,
                OrganizationMemberModel.organization_id == org_id,
                OrganizationMemberModel.status == "active"
            )
        )
        member = await session.execute(member_query)
        member = member.scalar_one_or_none()
        
        if not member:
            return []
            
        # Get permissions for this role
        role_perms_query = (
            select(PermissionModel)
            .join(RolePermissionModel, PermissionModel.id == RolePermissionModel.permission_id)
            .where(RolePermissionModel.role_id == member.role_id)
        )
        perms = await session.execute(role_perms_query)
        return list(perms.scalars().all())
    
  4. Handle multi-organization scenarios:
    // For clients working with multiple organizations:
    
    // Store current organization context
    function setCurrentOrganization(orgId) {
      localStorage.setItem('current_org_id', orgId);
    }
    
    // Add organization context to all API requests
    axios.interceptors.request.use(config => {
      const orgId = localStorage.getItem('current_org_id');
      if (orgId) {
        // For GET requests
        if (!config.params) {
          config.params = {};
        }
        if (!config.params.org_id) {
          config.params.org_id = orgId;
        }
        
        // For POST/PUT requests with JSON body
        if (config.data && typeof config.data === 'object' && !config.data.org_id) {
          config.data.org_id = orgId;
        }
      }
      
      return config;
    });
    

Login and Account Issues

Symptoms:
  • Unable to log in with valid credentials
  • Error message: β€œIncorrect username or password”
  • Persistent login failures
Solutions:
  1. Verify credentials against Definable’s auth service:
    # Definable's AuthService.post_login uses verify_password to check credentials
    
    # Example login request:
    response = requests.post(
        f"{API_URL}/auth/login",
        json={
            "email": "user@example.com",
            "password": "password123"
        }
    )
    
    if response.status_code != 200:
        error_data = response.json()
        print(f"Login error: {error_data.get('detail')}")
    else:
        token_data = response.json()
        print(f"Login successful. Token obtained: {token_data.get('access_token')[:10]}...")
    
  2. Check password verification in Definable:
    # Definable uses verify_password from auth_util:
    def verify_password(plain_password, hashed_password):
        """Verify a password against a hash."""
        return pwd_context.verify(plain_password, hashed_password)
    
    # Password hash algorithm is bcrypt used by passlib's CryptContext
    # If hashing algorithm was changed, older passwords may fail verification
    
  3. Check for account status issues:
    # Definable checks user status during login
    # Check user status in the database:
    user_query = select(UserModel).where(UserModel.email == email)
    user = await session.execute(user_query)
    user = user.scalar_one_or_none()
    
    if not user:
        print("User not found in database")
    else:
        print(f"User status: {user.status}")
        # Status can be "active", "inactive", "pending", etc.
        # Only "active" users can log in
    
  4. Check for email verification requirements:
    # Definable may require email verification before login
    # Check if email verified status:
    user_query = select(UserModel).where(UserModel.email == email)
    user = await session.execute(user_query)
    user = user.scalar_one_or_none()
    
    if user and not user.email_verified:
        print("Email not verified. Verification required before login.")
        
        # Resend verification email if needed:
        response = requests.post(
            f"{API_URL}/auth/resend-verification",
            json={"email": email}
        )
    
  5. Handle login lockouts:
    # Definable's implementation may have rate limiting on login attempts
    # Check for specific error messages indicating lockout
    
    # If using RateLimiter middleware from ratelimit.py:
    # Typical lockout message: "Rate limit exceeded"
    
    # To reset a locked account (admin operation):
    # 1. Reset the user's rate limit counter in Redis
    # 2. Or enable a temporary bypass for that user
    
    # Example client-side exponential backoff:
    async function loginWithBackoff(credentials, maxRetries = 5) {
      let retryCount = 0;
      let delay = 1000; // Start with 1 second
      
      while (retryCount < maxRetries) {
        try {
          const response = await fetch(`${API_URL}/auth/login`, {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify(credentials)
          });
          
          if (response.ok) {
            return await response.json();
          }
          
          const error = await response.json();
          if (error.detail.includes("Rate limit exceeded")) {
            console.log(`Rate limited. Waiting ${delay/1000} seconds before retry.`);
            await new Promise(resolve => setTimeout(resolve, delay));
            delay *= 2; // Exponential backoff
            retryCount++;
          } else {
            // Other error, not rate limiting
            throw new Error(error.detail);
          }
        } catch (err) {
          console.error("Login error:", err);
          throw err;
        }
      }
      
      throw new Error("Max retries exceeded for login");
    }
    
Symptoms:
  • Cannot accept invitation
  • Error in signup process
  • Invitation links expired or invalid
Solutions:
  1. Verify invitation token:
    # Definable handles invitations in AuthService.post_signup_invite and get_signup_invite
    
    # Check invitation status:
    response = requests.get(
        f"{API_URL}/auth/signup-invite?token={invitation_token}"
    )
    
    if response.status_code != 200:
        error_data = response.json()
        print(f"Invitation error: {error_data.get('detail')}")
    else:
        invite_data = response.json()
        print(f"Invitation valid for: {invite_data.get('email')}")
        print(f"Organization: {invite_data.get('organization', {}).get('name')}")
    
  2. Debug invitation token issues:
    # Definable validates invite tokens by:
    # 1. Checking if invite exists in the database
    # 2. Verifying it's not expired
    # 3. Ensuring it's not already used
    
    # Common issues:
    # - Token expired: invites have expiration timestamps
    # - Token already used: invite can be used only once
    # - Email mismatch: token is tied to a specific email
    
    # Check invitation in database:
    invite_query = select(InvitationModel).where(InvitationModel.token == token)
    invite = await session.execute(invite_query)
    invite = invite.scalar_one_or_none()
    
    if not invite:
        print("Invitation not found")
    elif invite.used:
        print("Invitation already used")
    elif invite.expires_at < datetime.utcnow():
        print("Invitation expired")
    else:
        print("Invitation valid")
    
  3. Resend invitation if needed:
    # Admin can resend invitation
    response = requests.post(
        f"{API_URL}/auth/invite",
        headers={"Authorization": f"Bearer {admin_token}"},
        json={
            "email": "user@example.com",
            "org_id": organization_id,
            "role_id": role_id
        }
    )
    
    if response.status_code == 200:
        print("New invitation sent successfully")
    
  4. Complete signup with invitation:
    # Complete signup using invitation
    response = requests.post(
        f"{API_URL}/auth/signup-invite",
        json={
            "email": "user@example.com",
            "password": "securePassword123",
            "first_name": "John",
            "last_name": "Doe",
            "token": invitation_token
        }
    )
    
    if response.status_code == 201:
        print("Signup with invitation successful")
        user_data = response.json()
        print(f"User created with ID: {user_data.get('id')}")
    else:
        error_data = response.json()
        print(f"Signup error: {error_data.get('detail')}")
    
Symptoms:
  • Cannot update profile information
  • Password reset fails
  • Email verification issues
Solutions:
  1. Debug password reset flow:
    # Definable's password reset flow:
    # 1. Request password reset token
    response = requests.post(
        f"{API_URL}/auth/forgot-password",
        json={"email": "user@example.com"}
    )
    
    if response.status_code == 200:
        print("Password reset email sent")
        
    # 2. Reset password with token
    response = requests.post(
        f"{API_URL}/auth/reset-password",
        json={
            "token": "reset_token_from_email",
            "password": "newSecurePassword123"
        }
    )
    
    if response.status_code == 200:
        print("Password reset successful")
    else:
        error_data = response.json()
        print(f"Password reset error: {error_data.get('detail')}")
    
  2. Check email verification status:
    # Definable tracks email verification status
    
    # Get user profile to check verification status
    response = requests.get(
        f"{API_URL}/auth/me",
        headers={"Authorization": f"Bearer {token}"}
    )
    
    if response.status_code == 200:
        user_data = response.json()
        print(f"Email verified: {user_data.get('email_verified', False)}")
        
    # Verify email with token:
    response = requests.post(
        f"{API_URL}/auth/verify-email",
        json={"token": "verification_token_from_email"}
    )
    
    if response.status_code == 200:
        print("Email verification successful")
    
  3. Update user profile:
    # Update user profile in Definable
    response = requests.put(
        f"{API_URL}/auth/me",
        headers={"Authorization": f"Bearer {token}"},
        json={
            "first_name": "John",
            "last_name": "Smith",
            # Other fields as needed
        }
    )
    
    if response.status_code == 200:
        print("Profile updated successfully")
    else:
        error_data = response.json()
        print(f"Profile update error: {error_data.get('detail')}")
    
  4. Handle account deletion:
    # Definable process for account deactivation
    
    # Deactivate (soft delete) account:
    response = requests.delete(
        f"{API_URL}/auth/me",
        headers={"Authorization": f"Bearer {token}"}
    )
    
    if response.status_code == 200:
        print("Account deactivated successfully")
    
    # Administrator hard delete (permanent)
    response = requests.delete(
        f"{API_URL}/users/{user_id}",
        headers={"Authorization": f"Bearer {admin_token}"}
    )
    
    if response.status_code == 204:
        print("User permanently deleted")
    

WebSocket Authentication Issues

Symptoms:
  • WebSocket connection is rejected
  • Error: β€œNot authenticated” or β€œInvalid token”
  • Connection drops immediately after establishing
Solutions:
  1. Verify WebSocket connection with token:
    // Definable's WebSocket authentication requires token in query parameters
    // Correct format:
    const socket = new WebSocket(`wss://${API_URL}/ws/connect?token=${jwt_token}&org_id=${org_id}`);
    
    socket.onopen = () => {
      console.log("WebSocket connection established");
    };
    
    socket.onerror = (error) => {
      console.error("WebSocket error:", error);
    };
    
    socket.onclose = (event) => {
      console.log(`WebSocket closed with code: ${event.code}, reason: ${event.reason}`);
    };
    
  2. Debug WebSocket auth errors:
    # Definable's JWTBearer validates WebSocket connections
    # For WebSockets, validate_socket_token is called
    
    # Server-side debugging:
    async def authenticate_ws_connection(websocket: WebSocket):
        # Extract token from query params
        token = websocket.query_params.get("token")
        if not token:
            await websocket.close(code=1008, reason="Missing token")
            return None
            
        # Extract org_id from query params
        org_id = websocket.query_params.get("org_id")
        if not org_id:
            await websocket.close(code=1008, reason="Missing org_id")
            return None
            
        # Validate JWT token
        try:
            payload = jwt.decode(
                token, 
                settings.jwt_secret, 
                algorithms=["HS256"]
            )
            user_id = payload.get("sub")
            
            # Further validation as needed
            return payload
        except Exception as e:
            await websocket.close(code=1008, reason=f"Invalid token: {str(e)}")
            return None
    
  3. Handle WebSocket reconnection with token refresh:
    // Client-side token refresh and reconnection
    class AuthenticatedWebSocket {
      constructor(baseUrl, getToken, getOrgId) {
        this.baseUrl = baseUrl;
        this.getToken = getToken;
        this.getOrgId = getOrgId;
        this.socket = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
        this.reconnectDelay = 1000;
        this.listeners = {
          message: [],
          error: [],
          close: [],
          open: []
        };
      }
      
      connect() {
        const token = this.getToken();
        const orgId = this.getOrgId();
        
        if (!token) {
          console.error("No token available for WebSocket connection");
          return;
        }
        
        if (!orgId) {
          console.error("No organization ID available for WebSocket connection");
          return;
        }
        
        // Definable's WebSocket endpoint with required auth parameters
        this.socket = new WebSocket(`${this.baseUrl}/ws/connect?token=${token}&org_id=${orgId}`);
        
        this.socket.onopen = (event) => {
          console.log("WebSocket connected");
          this.reconnectAttempts = 0;
          this.listeners.open.forEach(listener => listener(event));
        };
        
        this.socket.onmessage = (event) => {
          this.listeners.message.forEach(listener => listener(event));
        };
        
        this.socket.onerror = (event) => {
          this.listeners.error.forEach(listener => listener(event));
        };
        
        this.socket.onclose = (event) => {
          this.listeners.close.forEach(listener => listener(event));
          
          // Handle authentication failures (1008 = Policy Violation)
          if (event.code === 1008 && event.reason.includes("token")) {
            console.error("WebSocket authentication failed:", event.reason);
            // Try to refresh token before reconnecting
            this.refreshTokenAndReconnect();
          } else if (this.reconnectAttempts < this.maxReconnectAttempts) {
            // Handle other disconnects
            setTimeout(() => this.reconnect(), this.reconnectDelay);
          }
        };
      }
      
      async refreshTokenAndReconnect() {
        try {
          // Implement token refresh logic here
          await refreshAuthToken();
          this.reconnect();
        } catch (error) {
          console.error("Failed to refresh token:", error);
        }
      }
      
      reconnect() {
        this.reconnectAttempts++;
        const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
        console.log(`Attempting to reconnect in ${delay}ms (attempt ${this.reconnectAttempts})`);
        
        setTimeout(() => {
          this.connect();
        }, delay);
      }
      
      // Event listener methods
      on(event, callback) {
        if (this.listeners[event]) {
          this.listeners[event].push(callback);
        }
        return this;
      }
      
      close() {
        if (this.socket) {
          this.socket.close();
        }
      }
      
      send(data) {
        if (this.socket && this.socket.readyState === WebSocket.OPEN) {
          this.socket.send(typeof data === 'string' ? data : JSON.stringify(data));
        } else {
          console.error("WebSocket not connected");
        }
      }
    }
    
    // Usage:
    const ws = new AuthenticatedWebSocket(
      'wss://api.definable.ai',
      () => localStorage.getItem('access_token'),
      () => localStorage.getItem('current_org_id')
    );
    
    ws.on('open', () => console.log('Connected!'))
      .on('message', (event) => console.log('Received:', event.data))
      .connect();
    
  4. Check RBAC permissions for WebSocket connections:
    # Definable's WebSocketService uses RBAC for permission checking
    # WebSocket routes are protected with RBAC dependency
    
    # In WebSocketService.ws_connect, RBAC dependency is used:
    @websocket_endpoint.websocket("/connect")
    async def ws_connect(
      websocket: WebSocket,
      user: dict = Depends(RBAC("ws", "connect"))
    ):
        # If execution reaches here, user is authorized
        
    # Client debugging:
    # If WebSocket connection fails with 403, check user permissions
    response = requests.get(
        f"{API_URL}/roles/list_permissions?org_id={org_id}",
        headers={"Authorization": f"Bearer {token}"}
    )
    
    if response.status_code == 200:
        permissions = response.json()
        # Look for "ws:connect" permission
        has_ws_permission = any(
            p.get("resource") == "ws" and p.get("action") == "connect"
            for p in permissions
        )
        print(f"Has WebSocket connect permission: {has_ws_permission}")
    

Next Steps

If you’ve resolved your authentication issues, consider reviewing these related guides: If you’re still experiencing authentication problems, check the API documentation or contact the development team for assistance.
⌘I