Documentation Index
Fetch the complete documentation index at: https://docs.zyeta.io/llms.txt
Use this file to discover all available pages before exploring further.
This guide explains how to use Definableβs security dependencies to protect your API endpoints with authentication and role-based access control (RBAC). These FastAPI dependencies provide a clean, declarative way to enforce security policies across your application.
Overview
Definable provides two main security dependencies:
- JWTBearer: Validates JWT tokens (both Stytch session tokens and API keys)
- RBAC: Enforces role-based permissions on protected resources
These dependencies work together to provide a complete authentication and authorization solution.
Architecture
JWTBearer Dependency
The JWTBearer class validates JWT tokens and extracts user identity.
Location
File: src/dependencies/security.py
Implementation
from fastapi import Depends, HTTPException, Request, WebSocket
from fastapi.security import HTTPBearer
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
class JWTBearer(HTTPBearer):
"""
JWT Bearer authentication dependency.
Validates JWT tokens from:
- Stytch (session tokens) via JWKS
- Internal (API keys) via JWT_SECRET
Returns user context with stytch_user_id and internal user id.
"""
async def __call__(
self,
request: Request = None,
websocket: WebSocket = None,
session: AsyncSession = Depends(get_db),
) -> dict:
# HTTP Request authentication
if request:
credentials = await super().__call__(request)
if not credentials or credentials.scheme != "Bearer":
raise HTTPException(
status_code=403,
detail="Invalid authorization"
)
# Verify JWT using Stytch JWKS
response = await stytch_base.authenticate_user_with_jkws(
credentials.credentials
)
if response.success:
# Look up user by stytch_id
user_query = select(UserModel).where(
UserModel.stytch_id == response.data["sub"]
)
user_result = await session.execute(user_query)
user = user_result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=403,
detail="User not found"
)
return {
"stytch_user_id": response.data["sub"],
"id": str(user.id)
}
else:
raise HTTPException(
status_code=403,
detail="Access denied"
)
# WebSocket authentication
elif websocket:
token = websocket.query_params.get("token")
if not token:
raise HTTPException(
status_code=403,
detail="Invalid authorization"
)
# Verify WebSocket token (similar to HTTP)
response = await stytch_base.authenticate_user_with_jkws(token)
if response.success:
user_query = select(UserModel).where(
UserModel.stytch_id == response.data["sub"]
)
user_result = await session.execute(user_query)
user = user_result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=403,
detail="User not found"
)
return {
"stytch_user_id": response.data["sub"],
"id": str(user.id)
}
else:
raise HTTPException(
status_code=403,
detail="Access denied"
)
Using JWTBearer
Basic Authentication
Protect an endpoint by requiring a valid JWT token:
from fastapi import APIRouter, Depends
from src.dependencies.security import JWTBearer
router = APIRouter()
@router.get("/profile")
async def get_user_profile(
user: dict = Depends(JWTBearer())
):
"""
Protected endpoint - requires valid JWT token.
The 'user' parameter will contain:
{
"stytch_user_id": "user-live-xxx...",
"id": "5e6f7g8h-9i0j-1k2l-3m4n-5o6p7q8r9s0t"
}
"""
return {
"user_id": user["id"],
"stytch_id": user["stytch_user_id"]
}
The JWTBearer dependency returns a dictionary with user information:
@router.get("/dashboard")
async def get_dashboard(user: dict = Depends(JWTBearer())):
user_id = user["id"] # Internal user UUID
stytch_id = user["stytch_user_id"] # Stytch user ID
# Use user_id for database queries
user_data = await fetch_user_data(user_id)
return {"dashboard": user_data}
RBAC Dependency
The RBAC class enforces role-based permissions on resources.
Implementation
class RBAC:
"""
Role-Based Access Control dependency.
Checks if the authenticated user has permission to access
a specific resource with a specific action.
Usage:
@router.get("/resource", dependencies=[Depends(RBAC("resource", "read"))])
"""
def __init__(self, required_resource: str, required_action: str):
self.required_resource = required_resource
self.required_action = required_action
async def __call__(
self,
org_id: UUID,
user: dict = Depends(JWTBearer()),
session: AsyncSession = Depends(get_db),
) -> dict:
"""
Verify user has required permission in the organization.
Args:
org_id: Organization context for permission check
user: User context from JWTBearer
session: Database session
Returns:
User context if authorized
Raises:
HTTPException: 403 if user lacks permission
"""
user_id = UUID(user["id"])
# Query for user's role and permissions in the organization
query = (
select(PermissionModel.name)
.select_from(OrganizationMemberModel)
.join(
RoleModel,
OrganizationMemberModel.role_id == RoleModel.id
)
.join(
RolePermissionModel,
RoleModel.id == RolePermissionModel.role_id
)
.join(
PermissionModel,
RolePermissionModel.permission_id == PermissionModel.id
)
.where(
and_(
OrganizationMemberModel.user_id == user_id,
OrganizationMemberModel.organization_id == org_id,
OrganizationMemberModel.status == "active"
)
)
)
result = await session.execute(query)
permissions = [row[0] for row in result.fetchall()]
# Check for exact permission match
required_permission = f"{self.required_resource}:{self.required_action}"
if required_permission in permissions:
return user
# Check for wildcard permissions
if f"*:{self.required_action}" in permissions: # Any resource, specific action
return user
if f"{self.required_resource}:*" in permissions: # Specific resource, any action
return user
if "*:*" in permissions: # Full access
return user
# No matching permission found
raise HTTPException(
status_code=403,
detail=f"Insufficient permissions. Required: {required_permission}"
)
Using RBAC
Method 1: Dependencies Parameter (Recommended)
Use the dependencies parameter for cleaner code when you donβt need the user context:
@router.delete("/kb/{kb_id}")
async def delete_knowledge_base(
kb_id: UUID,
org_id: UUID,
# RBAC dependency ensures user has "kb:delete" permission
_: dict = Depends(RBAC("kb", "delete")),
session: AsyncSession = Depends(get_db)
):
"""
Delete a knowledge base.
Only users with 'kb:delete' permission can access this endpoint.
"""
# Implementation...
pass
Method 2: Direct Dependency
Use as a direct dependency when you need the user context:
@router.post("/kb/create")
async def create_knowledge_base(
kb_data: KBCreate,
org_id: UUID,
user: dict = Depends(RBAC("kb", "write")),
session: AsyncSession = Depends(get_db)
):
"""
Create a knowledge base.
The 'user' parameter contains user info AND verifies permission.
"""
user_id = user["id"]
# Create KB owned by this user
kb = KnowledgeBaseModel(
name=kb_data.name,
owner_id=UUID(user_id),
organization_id=org_id
)
session.add(kb)
await session.commit()
return kb
Permission Patterns
Standard Permissions
# Read access
@router.get("/resource/{id}", dependencies=[Depends(RBAC("resource", "read"))])
# Write access (create/update)
@router.post("/resource", dependencies=[Depends(RBAC("resource", "write"))])
@router.put("/resource/{id}", dependencies=[Depends(RBAC("resource", "write"))])
# Delete access
@router.delete("/resource/{id}", dependencies=[Depends(RBAC("resource", "delete"))])
# Admin access (full control)
@router.post("/resource/admin", dependencies=[Depends(RBAC("resource", "admin"))])
Resource Types
Common resources in Definable:
kb - Knowledge bases
conversation - Conversations/chats
agent - AI agents
tool - Tools
organization - Organization settings
user - User management
role - Role management
api_key - API key management
Action Types
Common actions:
read - View/list resources
write - Create/update resources
delete - Delete resources
admin - Full administrative access
execute - Execute/run resources (agents, tools)
Combining Dependencies
Both Authentication and Authorization
Most endpoints need both:
@router.get("/agents/list")
async def list_agents(
org_id: UUID,
user: dict = Depends(RBAC("agent", "read")),
session: AsyncSession = Depends(get_db)
):
"""
This endpoint:
1. Validates JWT token (via JWTBearer, called by RBAC)
2. Checks "agent:read" permission (via RBAC)
3. Returns user context
"""
# Both authentication and authorization are enforced
pass
Multiple Permission Checks
For endpoints requiring multiple permissions:
async def require_multiple_permissions(
org_id: UUID,
user: dict = Depends(JWTBearer()),
session: AsyncSession = Depends(get_db)
) -> dict:
"""Custom dependency for multiple permission checks."""
# Check first permission
try:
await RBAC("kb", "read").__call__(org_id, user, session)
except HTTPException:
raise HTTPException(
status_code=403,
detail="Missing kb:read permission"
)
# Check second permission
try:
await RBAC("agent", "read").__call__(org_id, user, session)
except HTTPException:
raise HTTPException(
status_code=403,
detail="Missing agent:read permission"
)
return user
@router.get("/combined-resource")
async def get_combined_resource(
org_id: UUID,
user: dict = Depends(require_multiple_permissions)
):
"""Requires both kb:read AND agent:read permissions."""
pass
WebSocket Authentication
WebSockets require special handling since they canβt use the Authorization header.
WebSocket Token Passing
Pass the JWT token as a query parameter:
// Client-side JavaScript
const token = localStorage.getItem('auth_token');
const socket = new WebSocket(`wss://api.definable.ai/ws?token=${token}`);
Protecting WebSocket Endpoints
from fastapi import WebSocket, Depends
from src.dependencies.security import JWTBearer
@router.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
user: dict = Depends(JWTBearer())
):
"""
WebSocket endpoint with JWT authentication.
Token must be provided as query parameter: /ws?token=xxx
"""
await websocket.accept()
user_id = user["id"]
try:
while True:
data = await websocket.receive_text()
# Process authenticated WebSocket messages
await websocket.send_text(f"Echo: {data}")
except WebSocketDisconnect:
print(f"User {user_id} disconnected")
WebSocket with RBAC
@router.websocket("/ws/chat/{conversation_id}")
async def chat_websocket(
websocket: WebSocket,
conversation_id: UUID,
org_id: UUID,
user: dict = Depends(JWTBearer()),
session: AsyncSession = Depends(get_db)
):
"""WebSocket with permission checking."""
# Manually check RBAC permission
rbac = RBAC("conversation", "write")
try:
await rbac(org_id, user, session)
except HTTPException as e:
await websocket.close(code=1008, reason="Insufficient permissions")
return
# Accept WebSocket connection
await websocket.accept()
# Continue with WebSocket logic...
Error Handling
Authentication Errors
401 Unauthorized - Token is invalid or missing:
HTTPException(status_code=401, detail="Invalid or expired token")
Common causes:
- Token expired
- Invalid token signature
- Malformed token
- Token from wrong environment (test vs live)
Authorization Errors
403 Forbidden - User lacks required permission:
HTTPException(status_code=403, detail="Insufficient permissions")
Common causes:
- User doesnβt have the required role
- Userβs role lacks the required permission
- User is not a member of the organization
- Userβs membership is not active
Custom Error Responses
@router.get("/resource/{id}")
async def get_resource(
id: UUID,
org_id: UUID,
user: dict = Depends(RBAC("resource", "read")),
session: AsyncSession = Depends(get_db)
):
try:
resource = await fetch_resource(id, session)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
return resource
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Testing Protected Endpoints
Unit Testing with Mocked Dependencies
from unittest.mock import Mock
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def mock_jwt_bearer(mocker):
"""Mock JWTBearer dependency."""
mock_user = {
"id": "test-user-id",
"stytch_user_id": "user-test-xxx"
}
mocker.patch(
"src.dependencies.security.JWTBearer.__call__",
return_value=mock_user
)
return mock_user
def test_protected_endpoint(client: TestClient, mock_jwt_bearer):
"""Test endpoint with mocked authentication."""
response = client.get(
"/profile",
headers={"Authorization": "Bearer mock-token"}
)
assert response.status_code == 200
Integration Testing with Real Tokens
def test_with_real_token(client: TestClient):
"""Test with actual Stytch token."""
# Login to get token
login_response = client.post(
"/api/auth/test_login",
json={
"email": "test@example.com",
"password": "TestPassword123!"
}
)
token = login_response.json()["token"]
# Use token for protected endpoint
response = client.get(
"/profile",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
Best Practices
1. Always Use Dependencies
Donβt manually parse tokens or check permissions:
# β BAD: Manual token parsing
@router.get("/resource")
async def get_resource(request: Request):
token = request.headers.get("Authorization")
# Manual validation...
# β
GOOD: Use dependencies
@router.get("/resource")
async def get_resource(user: dict = Depends(JWTBearer())):
# Token automatically validated
2. Specific Permissions
Use specific permissions instead of wildcards when possible:
# β
GOOD: Specific permission
Depends(RBAC("kb", "delete"))
# β οΈ AVOID: Too broad
Depends(RBAC("*", "*"))
3. Organization Context
Always pass org_id for RBAC checks:
@router.get("/resource")
async def get_resource(
org_id: UUID, # Required for RBAC
user: dict = Depends(RBAC("resource", "read"))
):
pass
4. Consistent Error Messages
Use consistent error messages for security:
# Don't reveal whether user exists
raise HTTPException(status_code=403, detail="Access denied")
# Instead of
raise HTTPException(status_code=404, detail="User not found")
5. Logging
Log authentication failures for security monitoring:
import logging
logger = logging.getLogger(__name__)
try:
user = await JWTBearer()(request)
except HTTPException:
logger.warning(f"Failed authentication attempt from {request.client.host}")
raise
Database Queries
The RBAC dependency performs database queries to check permissions. For high-traffic endpoints, consider:
- Caching: Cache role-permission mappings
- Connection Pooling: Use proper database connection pools
- Indexes: Ensure proper indexes on
organization_members, roles, and permissions tables
Token Validation
- JWKS Caching: Stytch public keys are cached for 10 minutes
- Connection Reuse: HTTP connections to Stytch are reused
- Local Validation: Most JWT validation happens locally
Next Steps