Skip to main content
This guide covers common database-related issues that may arise when working with the Definable backend.

Connection Issues

Symptoms:
  • Error: Could not connect to server: Connection refused
  • Error: OperationalError: (psycopg2.OperationalError) connection to server at "localhost" (::1), port 5432 failed
Solutions:
  1. Verify PostgreSQL is running:
    # For Linux
    sudo systemctl status postgresql
    
    # For macOS
    brew services list | grep postgres
    
    # For Windows
    # Check Services application (services.msc)
    
  2. Check connection parameters:
    • Verify hostname, port, username, password, and database name
    • Make sure database exists: psql -U postgres -c "SELECT datname FROM pg_database;"
  3. Test connection with psql:
    psql -U postgres -h localhost -p 5432 -d postgres
    
  4. Check PostgreSQL logs:
    # Location varies by system
    sudo tail -f /var/log/postgresql/postgresql-14-main.log  # Debian/Ubuntu
    sudo tail -f /var/lib/pgsql/data/log/  # RHEL/CentOS
    
Symptoms:
  • Error: asyncpg.exceptions.PostgresConnectionError: connection to server was closed
  • Error: asyncpg.exceptions.InvalidAuthorizationSpecificationError: password authentication failed
Solutions:
  1. Check your connection string format:
    # Correct format for asyncpg
    "postgresql+asyncpg://username:password@hostname:port/database"
    
  2. URL encode special characters in password:
    # If your password contains special characters like @, %, etc.
    from urllib.parse import quote_plus
    password = quote_plus("your@complex!password")
    connection_string = f"postgresql+asyncpg://username:{password}@hostname:port/database"
    
  3. Check PostgreSQL authentication settings (pg_hba.conf):
    • Ensure it allows password authentication (md5 or scram-sha-256)
    • For development, temporarily set local connections to β€˜trust’
  4. Test with a minimal example:
    import asyncio
    import asyncpg
    
    async def test_connection():
        conn = await asyncpg.connect("postgresql://username:password@localhost/postgres")
        version = await conn.fetchval("SELECT version();")
        print(version)
        await conn.close()
    
    asyncio.run(test_connection())
    
Symptoms:
  • Error: ssl_error_want_read or ssl_error_want_write
  • Error: SSL SYSCALL error: EOF detected
Solutions:
  1. Disable SSL for local development (if needed):
    # In your connection string
    postgresql+asyncpg://username:password@hostname:port/database?ssl=false
    
  2. For production, configure SSL properly:
    # Using SSL
    postgresql+asyncpg://username:password@hostname:port/database?ssl=true
    
    # Verify SSL certificate
    postgresql+asyncpg://username:password@hostname:port/database?ssl=true&sslmode=verify-full
    
  3. For Supabase with SSL issues:
    • Use the connection pooler URL instead of direct connection
    • Go to Supabase dashboard > Project Settings > Database > Connection Pooling

Migration Issues

Symptoms:
  • Error: FAILED: Multiple head revisions are present
  • Error: Can't locate revision identified by '...'
Solutions:
  1. For multiple heads:
    # List heads
    alembic heads
    
    # Create a merge migration
    alembic merge -m "merge heads" head1 head2
    
    # Then upgrade to the merged head
    alembic upgrade head
    
  2. For missing revisions:
    # Fix the down_revision in the problematic migration file
    # Make sure all migration files are in the versions directory
    
    # Or start fresh (development only)
    alembic stamp base  # Reset alembic version table
    alembic upgrade head  # Apply all migrations
    
  3. Check alembic configuration:
    • Verify alembic.ini has correct database URL
    • Check env.py imports your SQLAlchemy models correctly
Symptoms:
  • Error: Error: Target database is not up to date.
  • Error: Can't locate revision identified by '...'
Solutions:
  1. Synchronize with the team:
    # Pull latest migrations from version control
    git pull
    
    # Check current database revision
    alembic current
    
    # Upgrade to latest
    alembic upgrade head
    
  2. For development, reset migration state:
    # WARNING: This will destroy data in the development database
    # DROP DATABASE and recreate it
    
    # Then reset alembic
    alembic stamp base
    alembic upgrade head
    
  3. Fix revision chain manually:
    • Edit the down_revision in migration files to fix the chain
    • Use alembic history to understand the current chain
Symptoms:
  • alembic revision --autogenerate doesn’t detect model changes
  • Generated migration has unexpected changes
Solutions:
  1. Ensure models are imported in env.py:
    # In alembic/env.py
    from src.models import *  # Make sure your models are imported here
    
  2. Check model metadata:
    # Make sure your models use the correct metadata
    from src.database import Base
    
    class YourModel(Base):
        # model definition...
    
  3. Run with verbose output:
    alembic revision --autogenerate -m "your message" --verbose
    
  4. Check for unsupported model features:
    • Some SQLAlchemy constructs aren’t detected by Alembic
    • Add manual migrations for: Constraints, Indexes, some Column types
Symptoms:
  • Error like: ProgrammingError: column X does not exist
  • Error when executing migration’s upgrade() function
Solutions:
  1. Edit the migration file:
    • Fix the SQL or Alembic operations causing errors
  2. For data corruption, clean approach (development only):
    # Reset to previous working migration
    alembic downgrade your_last_working_revision
    
    # Fix the problematic migration file
    
    # Try upgrading again
    alembic upgrade head
    
  3. For schema issues where tables already exist:
    # In your migration file, check if table exists before creating
    if not op.has_table('table_name'):
        op.create_table(...)
    

SQLAlchemy Issues

Symptoms:
  • Error: TimeoutError: Connection attempt timed out
  • Application hangs when connecting to database
Solutions:
  1. Configure connection pooling correctly:
    # Adjust pool settings
    engine = create_async_engine(
        DATABASE_URL,
        pool_size=5,  # Adjust based on your needs
        max_overflow=10,
        pool_timeout=30,  # Seconds
        pool_recycle=1800,  # Recycle connections after 30 minutes
    )
    
  2. Implement retry logic for transient failures:
    from tenacity import retry, stop_after_attempt, wait_exponential
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
    async def get_connection():
        async with engine.connect() as conn:
            return conn
    
  3. Check for database resource limitations:
    • Maximum connections (max_connections in postgresql.conf)
    • Check current connections: SELECT count(*) FROM pg_stat_activity;
Symptoms:
  • Error: AssertionError: A sync operation occurred within an async transaction.
  • Error: InterfaceError: connection is closed
Solutions:
  1. Ensure you’re using async operations throughout:
    # Correct async pattern
    async with async_session() as session:
        async with session.begin():
            result = await session.execute(query)
            # Don't mix sync and async operations
    
  2. Check for instances of sync operations:
    • Replace .all() with await session.execute(query).all()
    • Replace .first() with await session.execute(query).first()
    • Use await session.commit() instead of session.commit()
  3. Fix connection closing issues:
    # Create a fresh session for each request
    async def get_db():
        async with async_session() as session:
            try:
                yield session
            finally:
                await session.close()
    
Symptoms:
  • Unexpected query results
  • Error: AttributeError: 'Query' object has no attribute 'xxx'
Solutions:
  1. Debug queries by logging SQL:
    # Add to your settings or session creation
    engine = create_async_engine(
        DATABASE_URL,
        echo=True,  # This will log all SQL
    )
    
  2. Review SQLAlchemy 2.0-style execution:
    # 2.0 style querying
    from sqlalchemy import select
    
    # Query
    query = select(User).where(User.email == email)
    result = await session.execute(query)
    user = result.scalar_one_or_none()
    
  3. Check for ORM vs. Core confusion:
    • Result objects differ between ORM queries and Core queries
    • For ORM: Use .scalars() to get model instances
    • For Core: Use .mappings() to get dictionaries

PostgreSQL Issues

Symptoms:
  • Error: permission denied for schema public
  • Error: permission denied for relation your_table
Solutions:
  1. Grant permissions to your database user:
    -- Connect as superuser
    GRANT ALL PRIVILEGES ON DATABASE your_database TO your_user;
    GRANT ALL PRIVILEGES ON SCHEMA public TO your_user;
    GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO your_user;
    GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO your_user;
    
  2. Check current user and permissions:
    -- Who am I?
    SELECT current_user;
    
    -- What permissions do I have?
    SELECT * FROM information_schema.role_table_grants 
    WHERE grantee = current_user;
    
  3. For hosted databases with restricted permissions:
    • Use the admin/owner account instead of a restricted role
    • Contact your database provider for assistance
Symptoms:
  • Poor search results
  • Performance issues with text search
Solutions:
  1. Create proper indexes:
    -- Create GIN index for full-text search
    CREATE INDEX idx_your_table_search ON your_table USING GIN (to_tsvector('english', your_text_column));
    
  2. Optimize your search queries:
    # Efficient full-text search with SQLAlchemy
    from sqlalchemy import text
    
    query = text("""
        SELECT * FROM your_table
        WHERE to_tsvector('english', your_text_column) @@ plainto_tsquery('english', :search_term)
        ORDER BY ts_rank(to_tsvector('english', your_text_column), plainto_tsquery('english', :search_term)) DESC
    """)
    
    result = await session.execute(query, {"search_term": search_term})
    
  3. Consider using a vector database for semantic search:
    • Use pgvector for embedding-based search
    • Create appropriate indexes for vector columns

Performance Issues

Symptoms:
  • Database operations taking too long
  • API response times deteriorating
Solutions:
  1. Identify slow queries:
    -- Find slow queries
    SELECT query, calls, total_time, mean_time
    FROM pg_stat_statements
    ORDER BY mean_time DESC
    LIMIT 10;
    
    -- If pg_stat_statements is not enabled, enable it in postgresql.conf
    
  2. Use EXPLAIN ANALYZE to understand query plans:
    EXPLAIN ANALYZE your_slow_query;
    
  3. Add appropriate indexes:
    -- For columns used in WHERE clauses
    CREATE INDEX idx_your_table_column ON your_table(your_column);
    
    -- For foreign keys
    CREATE INDEX idx_your_table_fk_column ON your_table(foreign_key_column);
    
  4. Optimize your queries:
    • Use LIMIT to restrict result size
    • Only select needed columns
    • Avoid multiple joins when possible
    • Consider pagination for large result sets
Symptoms:
  • Error: FATAL: too many connections
  • Applications waiting for database connections
Solutions:
  1. Configure connection pooling properly:
    # Adjust pool settings in SQLAlchemy
    engine = create_async_engine(
        DATABASE_URL,
        pool_size=5,  # Start with a reasonable value
        max_overflow=10,
        pool_pre_ping=True,  # Check connections before using them
    )
    
  2. Check current connections and limits:
    -- Current connections
    SELECT count(*) FROM pg_stat_activity;
    
    -- Connection limit
    SHOW max_connections;
    
  3. For production, consider external connection poolers:
    • PgBouncer
    • AWS RDS Proxy
    • Supabase Connection Pooler
Symptoms:
  • Out of memory errors
  • Slow API responses when retrieving many records
Solutions:
  1. Implement pagination:
    # With SQLAlchemy
    query = select(YourModel).offset(offset).limit(limit)
    
    # In API endpoints
    @app.get("/items/")
    async def get_items(offset: int = 0, limit: int = 100):
        # Apply pagination
    
  2. Use cursor-based pagination for large datasets:
    # Using keyset pagination (more efficient than offset/limit)
    # Example: sorting by id
    query = select(YourModel).where(YourModel.id > last_id).order_by(YourModel.id).limit(limit)
    
  3. Stream results for large exports:
    # Using SQLAlchemy async streaming
    async def stream_results():
        async with async_session() as session:
            stream = await session.stream(select(YourModel))
            async for row in stream:
                yield row
    

Data Integrity Issues

Symptoms:
  • Error: invalid input syntax for type uuid
  • Missing UUID values in records
Solutions:
  1. Ensure UUIDs are used consistently:
    # In your SQLAlchemy models
    from sqlalchemy.dialects.postgresql import UUID
    import uuid
    
    class YourModel(Base):
        id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    
  2. For migrations involving UUID columns:
    # In alembic migrations
    from sqlalchemy.dialects import postgresql
    
    op.create_table(
        'your_table',
        sa.Column('id', postgresql.UUID(as_uuid=True), server_default=sa.text("gen_random_uuid()"), primary_key=True),
        # other columns...
    )
    
  3. Check database and python type consistency:
    # Convert strings to UUID when needed
    from uuid import UUID
    
    def validate_uuid(value):
        if isinstance(value, str):
            return UUID(value)
        return value
    
Symptoms:
  • Error: violates foreign key constraint
  • Error: insert or update on table violates foreign key constraint
Solutions:
  1. Check referential integrity:
    -- Find the constraint that's failing
    SELECT conname, conrelid::regclass, confrelid::regclass
    FROM pg_constraint
    WHERE contype = 'f';
    
    -- Check if referenced id exists
    SELECT * FROM parent_table WHERE id = 'referenced_id';
    
  2. Fix application logic:
    • Ensure parent records exist before creating child records
    • Implement proper cascading deletes in models
    # In SQLAlchemy model
    parent_id = Column(UUID(as_uuid=True), ForeignKey("parent.id", ondelete="CASCADE"))
    
  3. For migration or data fixing:
    -- Temporarily disable constraints (for data fixing only)
    SET session_replication_role = 'replica';
    -- Fix data...
    SET session_replication_role = 'origin';
    
Symptoms:
  • Error: duplicate key value violates unique constraint
  • Insert or update operations failing
Solutions:
  1. Check existing data:
    -- Find duplicate records
    SELECT column_name, COUNT(*) 
    FROM your_table 
    GROUP BY column_name 
    HAVING COUNT(*) > 1;
    
  2. Implement upsert logic:
    # Using SQLAlchemy 2.0 style
    from sqlalchemy.dialects.postgresql import insert
    
    stmt = insert(YourModel).values(
        email="user@example.com",
        name="User"
    )
    
    # ON CONFLICT DO UPDATE
    stmt = stmt.on_conflict_do_update(
        index_elements=['email'],
        set_=dict(name="User")
    )
    
    await session.execute(stmt)
    
  3. For batch operations, find and filter duplicates before insert:
    # Get existing records
    existing = await session.execute(
        select(YourModel.email).where(YourModel.email.in_(emails_to_insert))
    )
    existing_emails = {r[0] for r in existing.fetchall()}
    
    # Filter out duplicates
    new_items = [item for item in items if item.email not in existing_emails]
    

Advanced Issues

Symptoms:
  • Error: Unexpected JSON type
  • Problems querying or updating JSON fields
Solutions:
  1. Use appropriate column type:
    # In models
    from sqlalchemy.dialects.postgresql import JSONB
    
    settings = Column(JSONB, nullable=False, server_default='{}')
    
  2. Query JSON data efficiently:
    # Get records where settings contains specific key
    query = select(YourModel).where(YourModel.settings.contains({"key": "value"}))
    
    # Get records based on JSON path
    query = select(YourModel).where(YourModel.settings["nested"]["key"].astext == "value")
    
  3. Update JSON fields:
    # Update specific key in JSONB
    from sqlalchemy.dialects.postgresql import insert
    from sqlalchemy import func
    
    await session.execute(
        update(YourModel)
        .where(YourModel.id == model_id)
        .values(settings=func.jsonb_set(YourModel.settings, '{key}', '"new_value"', True))
    )
    
Symptoms:
  • Deadlocks
  • Error: current transaction is aborted, commands ignored until end of transaction block
Solutions:
  1. Use proper transaction patterns:
    # Context manager approach
    async with session.begin():
        # All operations here are in a transaction
        # Auto-commits at the end or rolls back on exception
    
    # Explicit approach
    try:
        await session.begin()
        # operations...
        await session.commit()
    except:
        await session.rollback()
        raise
    
  2. Handle nested transactions:
    # Using savepoints for nested transactions
    async with session.begin_nested() as nested:
        # Create a savepoint
        try:
            # operations that might fail
            await nested.commit()
        except:
            # This rolls back to the savepoint, not the entire transaction
            pass
    
  3. For deadlocks, implement retry logic:
    from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
    from sqlalchemy.exc import OperationalError
    
    @retry(
        stop=stop_after_attempt(3), 
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception_type(OperationalError)
    )
    async def execute_with_retry():
        async with session.begin():
            # Your database operations
    
Symptoms:
  • Slow vector similarity searches
  • High CPU usage during vector operations
Solutions:
  1. Create appropriate indexes:
    -- For cosine distance
    CREATE INDEX ON your_table USING ivfflat (embedding vector_cosine_ops)
      WITH (lists = 100);
    
    -- For L2 distance
    CREATE INDEX ON your_table USING ivfflat (embedding vector_l2_ops)
      WITH (lists = 100);
    
  2. Tune search parameters:
    # Limit number of probes (trades accuracy for speed)
    # In SQLAlchemy
    from sqlalchemy import text
    
    # Set session variable for next queries
    await session.execute(text("SET ivfflat.probes = 10;"))
    
    # Then perform vector search
    
  3. Optimize embedding dimension and storage:
    • Use the smallest embedding size that maintains accuracy
    • Consider dimensionality reduction techniques
    • For very large collections, implement clustering or partitioning

Supabase Connection Issues

Symptoms:
  • Error: could not connect to server: Connection timed out (0x0000274C/10060) Is the server running on host "db.abcdefghijkl.supabase.co" and accepting TCP/IP connections on port 5432?
  • Connection works from one network but not another
  • Intermittent connection issues
Solutions:
  1. Force IPv4 connections:
    # Add host_name=ipv4: prefix to force IPv4
    # Original: postgresql+asyncpg://username:password@db.abcdefghijkl.supabase.co:5432/postgres
    # Modified:
    "postgresql+asyncpg://username:password@ipv4:db.abcdefghijkl.supabase.co:5432/postgres"
    
  2. Disable IPv6 on your local machine (temporary test):
    # For Linux
    sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
    
    # For Windows (run in PowerShell as Administrator)
    Disable-NetAdapterBinding -Name "*" -ComponentID ms_tcpip6
    
    # For macOS
    networksetup -setv6off Wi-Fi  # or your network interface
    
  3. Use Supabase connection pooler instead of direct connection:
    • Go to Supabase Dashboard > Project Settings > Database > Connection Pooling
    • Use the provided connection string which may bypass IP version issues
    postgresql://[user]:[password]@[host].pooler.supabase.com:6543/postgres
    
  4. Test connection using domain instead of IP address:
    psql -h db.abcdefghijkl.supabase.co -p 5432 -U postgres -d postgres
    
Symptoms:
  • Connections time out after exactly 30-60 seconds
  • Error: OperationalError: could not connect to server: Connection refused
  • Error: ssl_error_syscall with Supabase connection
Solutions:
  1. Check if your network allows outbound connections to port 5432:
    # Test connectivity with netcat
    nc -zv db.abcdefghijkl.supabase.co 5432
    
    # Alternative with telnet
    telnet db.abcdefghijkl.supabase.co 5432
    
  2. Check if your IP is allowlisted in Supabase:
    • Go to Supabase Dashboard > Project Settings > Database > Network Restrictions
    • Add your current IP address to the allowed list
    • If using dynamic IPs, you may need to keep this updated
  3. Use Supabase’s HTTPS connection pooler for restrictive networks:
    # Connection pooler over HTTPS (port 443)
    # This may bypass firewall restrictions on standard PostgreSQL ports
    postgresql://[user]:[password]@[host].pooler.supabase.co:6543/postgres?pgbouncer=true
    
  4. VPN or proxy solutions:
    • If your network has strict firewall rules, consider using a VPN
    • Configure a proxy that allows PostgreSQL connections
Symptoms:
  • Error: remaining connection slots are reserved for non-replication superuser connections
  • Connections work initially but fail under load
  • Error: sorry, too many clients already when multiple services connect
Solutions:
  1. Configure proper connection pooling in application:
    # Properly configured connection pooling for Supabase
    engine = create_async_engine(
        DATABASE_URL,
        pool_size=3,  # Keep pool size small for Supabase
        max_overflow=2,
        pool_timeout=30,
        pool_recycle=1800,  # Recycle connections after 30 minutes
        pool_pre_ping=True,  # Verify connections are still alive
    )
    
  2. Use Supabase’s connection pooler:
    # Add pgbouncer=true parameter
    postgresql://[user]:[password]@[host].pooler.supabase.co:6543/postgres?pgbouncer=true
    
  3. Implement aggressive connection clean-up:
    # Ensure connections are properly closed
    async def get_db():
        async with async_session() as session:
            try:
                yield session
            finally:
                await session.close()
                
    # For non-FastAPI applications, always use try/finally
    try:
        async with async_session() as session:
            # your code
    finally:
        await engine.dispose()  # Close all connections when done
    
  4. Monitor and adjust connection limits in dashboard:
    • Go to Supabase Dashboard > Project Settings > Database > Connection Pooling
    • Adjust pool mode and connection limits based on your tier
    • For Free and Pro tiers, be especially careful with connection counts
Symptoms:
  • High latency on database operations
  • Intermittent timeouts despite successful connections
  • Connection issues during specific times of day
Solutions:
  1. Select appropriate Supabase region:
    • When creating a new project, choose the region closest to your users/servers
    • For existing projects, consider migrating to a closer region if latency is critical
  2. Configure longer timeouts for high-latency connections:
    # Increase connection timeout and command timeout
    engine = create_async_engine(
        DATABASE_URL,
        connect_args={
            "command_timeout": 30.0,  # Seconds for each query
            "timeout": 60.0           # Seconds for connection establishment
        }
    )
    
  3. Implement connection caching for read-heavy workloads:
    # Use Redis or in-memory caching for frequently accessed data
    from functools import lru_cache
    
    @lru_cache(maxsize=100)
    async def get_cached_data(id):
        async with async_session() as session:
            result = await session.execute(select(Data).where(Data.id == id))
            return result.scalar_one_or_none()
    
  4. Consider Supabase Edge Functions for latency-sensitive operations:
    • Deploy Edge Functions closer to the database
    • Reduce round-trip time for critical operations
Symptoms:
  • Error: SSL SYSCALL error: EOF detected
  • Error: ssl_error_want_read or ssl_error_want_write
  • Error: SSL error: certificate verify failed
Solutions:
  1. Configure SSL properly:
    # Properly configure SSL for Supabase connection
    engine = create_async_engine(
        DATABASE_URL,
        connect_args={
            "ssl": True,
            "sslmode": "require"  # Or verify-full for stricter validation
        }
    )
    
  2. For development/testing, disable strict certificate verification (not recommended for production):
    # Less strict SSL for development only
    import ssl
    
    engine = create_async_engine(
        DATABASE_URL,
        connect_args={
            "ssl": ssl.create_default_context(),
            "ssl_context": ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=None)
        }
    )
    
  3. Update CA certificates:
    # For Linux
    sudo update-ca-certificates
    
    # For macOS
    brew install openssl@3
    
    # For Windows
    # Check for Windows updates or update browser
    
  4. Use Supabase connection string with SSL parameters:
    postgresql://[user]:[password]@[host].supabase.co:5432/postgres?sslmode=require
    
Symptoms:
  • Hitting connection limits unexpectedly
  • Database operations becoming slower at certain times
  • Error: sorry, too many clients already despite proper connection pooling
Solutions:
  1. Be aware of tier limitations:
    • Free tier: Limited connections, compute, and may pause after inactivity
    • Pro tier: Higher limits but still restricted compared to enterprise
    • Check current tier limits at Supabase dashboard
  2. Implement application-level connection management:
    # Add retry logic with exponential backoff
    from tenacity import retry, stop_after_attempt, wait_exponential
    
    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=60))
    async def execute_db_operation():
        async with async_session() as session:
            # Your database operations
    
  3. Add monitoring to track connection usage:
    # Log connection information
    import logging
    
    logging.info(f"Engine stats: {engine.pool.status()}")
    
    # Create dashboards to monitor connection count trends
    
  4. Consider upgrading tier or optimizing application:
    • For production workloads, higher tiers provide better guarantees
    • Implement aggressive connection pooling
    • Add read replicas for read-heavy workloads if available in your tier

Next Steps

If you’ve resolved your database issues, consider reviewing these related guides: If you’re still experiencing database problems, check PostgreSQL and SQLAlchemy official documentation or contact the development team for assistance.
⌘I