Database Migration Patterns
Best practices for safe database schema migrations.
Migration File Structure
migrations/ ├── versions/ │ ├── 001_initial_schema.py │ ├── 002_add_users_table.py │ ├── 003_add_user_email_index.py │ └── 004_add_orders_table.py ├── alembic.ini └── env.py
Alembic Migration Template
"""Add users table
Revision ID: 002 Revises: 001 Create Date: 2024-01-15 10:00:00.000000 """ from alembic import op import sqlalchemy as sa from sqlalchemy.dialects import postgresql
revision identifiers
revision = '002' down_revision = '001' branch_labels = None depends_on = None
def upgrade() -> None: op.create_table( 'users', sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True), sa.Column('email', sa.String(255), nullable=False), sa.Column('name', sa.String(255), nullable=True), sa.Column('status', sa.String(50), nullable=False, server_default='pending'), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()), )
op.create_index('ix_users_email', 'users', ['email'], unique=True)
op.create_index('ix_users_status', 'users', ['status'])
def downgrade() -> None: op.drop_index('ix_users_status') op.drop_index('ix_users_email') op.drop_table('users')
Zero-Downtime Migration Patterns
Pattern 1: Expand and Contract
Step 1: Add new column (nullable)
def upgrade_step1(): op.add_column('users', sa.Column('email_new', sa.String(255), nullable=True))
Step 2: Backfill data (run separately, possibly in batches)
def backfill(): connection = op.get_bind() connection.execute(""" UPDATE users SET email_new = email WHERE email_new IS NULL LIMIT 10000 """)
Step 3: Make new column non-nullable, drop old column
def upgrade_step3(): op.alter_column('users', 'email_new', nullable=False) op.drop_column('users', 'email') op.alter_column('users', 'email_new', new_column_name='email')
Pattern 2: Safe Column Rename
DON'T: Direct rename causes downtime
op.alter_column('users', 'username', new_column_name='email')
DO: Expand-Contract pattern
def upgrade(): # 1. Add new column op.add_column('users', sa.Column('email', sa.String(255)))
# 2. Create trigger to sync data (PostgreSQL)
op.execute("""
CREATE OR REPLACE FUNCTION sync_username_to_email()
RETURNS TRIGGER AS $$
BEGIN
NEW.email = NEW.username;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER sync_username_email
BEFORE INSERT OR UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION sync_username_to_email();
""")
# 3. Backfill existing data
op.execute("UPDATE users SET email = username WHERE email IS NULL")
Later migration after app updated:
def upgrade_cleanup(): op.execute("DROP TRIGGER sync_username_email ON users") op.execute("DROP FUNCTION sync_username_to_email()") op.drop_column('users', 'username')
Pattern 3: Safe Index Creation
def upgrade(): # Create index concurrently to avoid locking op.execute(""" CREATE INDEX CONCURRENTLY ix_users_email ON users (email) """)
def downgrade(): op.execute("DROP INDEX CONCURRENTLY ix_users_email")
Pattern 4: Adding NOT NULL Constraint
def upgrade(): # 1. Add column as nullable op.add_column('users', sa.Column('verified', sa.Boolean(), nullable=True))
# 2. Set default for new rows
op.alter_column('users', 'verified', server_default=sa.false())
# 3. Backfill existing rows (in batches for large tables)
op.execute("""
UPDATE users
SET verified = false
WHERE verified IS NULL
""")
# 4. Add NOT NULL constraint
op.alter_column('users', 'verified', nullable=False)
Batch Processing for Large Tables
from sqlalchemy import text
def backfill_in_batches(connection, batch_size=10000): """Backfill data in batches to avoid long locks.""" while True: result = connection.execute(text(""" UPDATE users SET new_column = old_column WHERE id IN ( SELECT id FROM users WHERE new_column IS NULL LIMIT :batch_size FOR UPDATE SKIP LOCKED ) RETURNING id """), {"batch_size": batch_size})
updated = result.rowcount
connection.commit()
if updated == 0:
break
print(f"Updated {updated} rows")
time.sleep(0.1) # Small delay to reduce load
Data Migration Patterns
Separate Data Migrations
Schema migration (runs during deploy)
def upgrade(): op.add_column('orders', sa.Column('total_cents', sa.BigInteger()))
Data migration (runs separately)
data_migrations/migrate_order_totals.py
def run_data_migration(): """Convert total from dollars to cents.""" with engine.connect() as conn: while True: result = conn.execute(text(""" UPDATE orders SET total_cents = total * 100 WHERE total_cents IS NULL AND id IN ( SELECT id FROM orders WHERE total_cents IS NULL LIMIT 5000 ) """))
if result.rowcount == 0:
break
conn.commit()
Foreign Key Constraints
Safe Foreign Key Addition
def upgrade(): # 1. Add column without constraint op.add_column('orders', sa.Column('user_id', postgresql.UUID(), nullable=True) )
# 2. Backfill data
op.execute("""
UPDATE orders o
SET user_id = (
SELECT id FROM users u
WHERE u.legacy_id = o.legacy_user_id
)
""")
# 3. Add constraint with NOT VALID (PostgreSQL)
op.execute("""
ALTER TABLE orders
ADD CONSTRAINT fk_orders_user_id
FOREIGN KEY (user_id) REFERENCES users(id)
NOT VALID
""")
# 4. Validate constraint in background
op.execute("""
ALTER TABLE orders
VALIDATE CONSTRAINT fk_orders_user_id
""")
Version Control Integration
CI/CD pipeline for migrations
name: Database Migration
on: push: paths: - 'migrations/**'
jobs: validate: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4
- name: Validate migration
run: |
alembic check
alembic upgrade head --sql > /dev/null
- name: Check for destructive operations
run: |
# Fail if migration contains DROP without review
if grep -r "op.drop" migrations/versions/*.py; then
echo "::warning::Migration contains DROP operations"
fi
deploy-staging: needs: validate runs-on: ubuntu-latest environment: staging steps: - name: Run migration run: alembic upgrade head
Rollback Strategies
Always implement downgrade
def downgrade(): # For additive changes, downgrade is straightforward op.drop_column('users', 'new_column')
For destructive changes, preserve data
def upgrade(): # Rename instead of drop op.rename_table('old_table', '_old_table_backup')
def downgrade(): op.rename_table('_old_table_backup', 'old_table')
Testing Migrations
tests/test_migrations.py
import pytest from alembic.config import Config from alembic import command
@pytest.fixture def alembic_config(): config = Config("alembic.ini") return config
def test_upgrade_downgrade(alembic_config, test_database): """Test that all migrations can upgrade and downgrade.""" # Upgrade to head command.upgrade(alembic_config, "head")
# Downgrade to base
command.downgrade(alembic_config, "base")
# Upgrade again
command.upgrade(alembic_config, "head")
def test_migration_is_reversible(alembic_config, test_database): """Test each migration individually.""" revisions = get_all_revisions()
for rev in revisions:
command.upgrade(alembic_config, rev)
command.downgrade(alembic_config, "-1")
command.upgrade(alembic_config, rev)
References
-
Alembic Documentation
-
PostgreSQL ALTER TABLE
-
Zero-Downtime Migrations
-
Safe Database Migrations