alembic

Alembic Database Migration Management Skill

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "alembic" with this command: npx skills add manutej/luxor-claude-marketplace/manutej-luxor-claude-marketplace-alembic

Alembic Database Migration Management Skill

Overview

This skill provides comprehensive guidance for managing database migrations using Alembic in customer support environments. It covers everything from initial setup through complex production deployment scenarios, with a focus on maintaining data integrity and minimizing downtime for support operations.

Core Concepts

What is Alembic?

Alembic is a lightweight database migration tool for use with SQLAlchemy. It provides a way to manage changes to your database schema over time through version-controlled migration scripts. For customer support systems, this means:

  • Version Control: Track all schema changes in your support database

  • Reproducibility: Apply the same migrations across dev, staging, and production

  • Rollback Capability: Safely revert problematic changes

  • Team Collaboration: Merge schema changes from multiple developers

  • Data Preservation: Migrate data during schema transformations

Migration Lifecycle in Support Systems

  • Development: Create migrations locally while developing new features

  • Testing: Validate migrations in staging environment

  • Review: Code review migration scripts before production

  • Deployment: Apply migrations to production with minimal downtime

  • Monitoring: Track migration status and handle failures

  • Rollback: Revert if issues arise in production

Installation and Initial Setup

Installing Alembic

Install Alembic with PostgreSQL support

pip install alembic psycopg2-binary sqlalchemy

Or add to requirements.txt

alembic>=1.13.0 sqlalchemy>=2.0.0 psycopg2-binary>=2.9.0

Initialize Alembic in Your Project

Initialize Alembic (creates alembic/ directory and alembic.ini)

alembic init alembic

For multiple database support

alembic init --template multidb alembic

This creates:

  • alembic/ : Directory containing migration scripts

  • alembic/versions/ : Where individual migration files live

  • alembic/env.py : Migration environment configuration

  • alembic.ini : Alembic configuration file

Configure Database Connection

Edit alembic.ini to set your database URL:

For development

sqlalchemy.url = postgresql://user:password@localhost/support_dev

For production (use environment variables)

sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASSWORD)s@%(DB_HOST)s/%(DB_NAME)s

Better approach - use environment variables in env.py :

import os from logging.config import fileConfig from sqlalchemy import engine_from_config, pool from alembic import context

Import your models

from myapp.models import Base

This is the Alembic Config object

config = context.config

Override sqlalchemy.url from environment

db_url = os.getenv('DATABASE_URL', 'postgresql://localhost/support_dev') config.set_main_option('sqlalchemy.url', db_url)

Set up target metadata for autogenerate

target_metadata = Base.metadata

Creating Migrations

Manual Migration Creation

Create a migration manually when you need precise control:

Create empty migration file

alembic revision -m "add ticket priority column"

This generates a file like versions/abc123_add_ticket_priority_column.py :

"""add ticket priority column

Revision ID: abc123 Revises: def456 Create Date: 2025-01-15 10:30:00.000000 """

from alembic import op import sqlalchemy as sa

revision identifiers

revision = 'abc123' down_revision = 'def456' branch_labels = None depends_on = None

def upgrade() -> None: # Add priority column to tickets table op.add_column('tickets', sa.Column('priority', sa.String(20), nullable=True, server_default='normal') )

# Create index for performance
op.create_index('ix_tickets_priority', 'tickets', ['priority'])

def downgrade() -> None: # Remove index first op.drop_index('ix_tickets_priority', 'tickets')

# Remove column
op.drop_column('tickets', 'priority')

Autogenerate Migrations

Let Alembic detect schema changes automatically:

Generate migration by comparing models to database

alembic revision --autogenerate -m "add customer satisfaction table"

Important: Always review autogenerated migrations! They may miss:

  • Renamed columns (appears as drop + add)

  • Changed column types requiring data conversion

  • Complex constraints

  • Data migrations

Example autogenerated migration:

"""add customer satisfaction table

Revision ID: xyz789 Revises: abc123 Create Date: 2025-01-15 11:00:00.000000 """

from alembic import op import sqlalchemy as sa

revision = 'xyz789' down_revision = 'abc123' branch_labels = None depends_on = None

def upgrade() -> None: # Auto-generated - review before running! op.create_table( 'customer_satisfaction', sa.Column('id', sa.Integer(), nullable=False), sa.Column('ticket_id', sa.Integer(), nullable=False), sa.Column('rating', sa.Integer(), nullable=False), sa.Column('feedback', sa.Text(), nullable=True), sa.Column('created_at', sa.DateTime(), nullable=False), sa.ForeignKeyConstraint(['ticket_id'], ['tickets.id'], ondelete='CASCADE'), sa.PrimaryKeyConstraint('id') ) op.create_index('ix_satisfaction_ticket_id', 'customer_satisfaction', ['ticket_id']) op.create_index('ix_satisfaction_created_at', 'customer_satisfaction', ['created_at'])

def downgrade() -> None: op.drop_index('ix_satisfaction_created_at', 'customer_satisfaction') op.drop_index('ix_satisfaction_ticket_id', 'customer_satisfaction') op.drop_table('customer_satisfaction')

Data Migrations

Migrating Data During Schema Changes

When you need to transform existing data:

"""convert ticket status to new enum

Revision ID: data001 Revises: xyz789 Create Date: 2025-01-15 12:00:00.000000 """

from alembic import op import sqlalchemy as sa from sqlalchemy.sql import table, column

revision = 'data001' down_revision = 'xyz789'

def upgrade() -> None: # Create new status column op.add_column('tickets', sa.Column('status_new', sa.String(50), nullable=True) )

# Migrate data using bulk update
tickets = table('tickets',
    column('status', sa.String),
    column('status_new', sa.String)
)

# Map old statuses to new ones
status_mapping = {
    'open': 'OPEN',
    'in_progress': 'IN_PROGRESS',
    'pending': 'WAITING_ON_CUSTOMER',
    'resolved': 'RESOLVED',
    'closed': 'CLOSED'
}

connection = op.get_bind()
for old_status, new_status in status_mapping.items():
    connection.execute(
        tickets.update().where(
            tickets.c.status == old_status
        ).values(status_new=new_status)
    )

# Make new column non-nullable now that data is migrated
op.alter_column('tickets', 'status_new', nullable=False)

# Drop old column and rename new one
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_new', new_column_name='status')

def downgrade() -> None: # Reverse the migration op.add_column('tickets', sa.Column('status_old', sa.String(50), nullable=True) )

tickets = table('tickets',
    column('status', sa.String),
    column('status_old', sa.String)
)

# Reverse mapping
reverse_mapping = {
    'OPEN': 'open',
    'IN_PROGRESS': 'in_progress',
    'WAITING_ON_CUSTOMER': 'pending',
    'RESOLVED': 'resolved',
    'CLOSED': 'closed'
}

connection = op.get_bind()
for new_status, old_status in reverse_mapping.items():
    connection.execute(
        tickets.update().where(
            tickets.c.status == new_status
        ).values(status_old=old_status)
    )

op.alter_column('tickets', 'status_old', nullable=False)
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_old', new_column_name='status')

Large Data Migrations with Batching

For large tables, process data in batches:

"""add computed resolution time to tickets

Revision ID: data002 Revises: data001 Create Date: 2025-01-15 13:00:00.000000 """

from alembic import op import sqlalchemy as sa from sqlalchemy.sql import table, column, select

revision = 'data002' down_revision = 'data001'

def upgrade() -> None: # Add new column op.add_column('tickets', sa.Column('resolution_time_seconds', sa.Integer(), nullable=True) )

connection = op.get_bind()
tickets = table('tickets',
    column('id', sa.Integer),
    column('created_at', sa.DateTime),
    column('resolved_at', sa.DateTime),
    column('resolution_time_seconds', sa.Integer)
)

# Process in batches to avoid memory issues
batch_size = 1000
offset = 0

while True:
    # Get batch of tickets that need processing
    batch = connection.execute(
        select(
            tickets.c.id,
            tickets.c.created_at,
            tickets.c.resolved_at
        ).where(
            sa.and_(
                tickets.c.resolved_at.isnot(None),
                tickets.c.resolution_time_seconds.is_(None)
            )
        ).limit(batch_size).offset(offset)
    ).fetchall()

    if not batch:
        break

    # Update batch
    for row in batch:
        if row.resolved_at and row.created_at:
            resolution_time = (row.resolved_at - row.created_at).total_seconds()
            connection.execute(
                tickets.update().where(
                    tickets.c.id == row.id
                ).values(resolution_time_seconds=int(resolution_time))
            )

    offset += batch_size

# Now make column non-nullable for future rows
op.alter_column('tickets', 'resolution_time_seconds',
    nullable=False, server_default='0')

def downgrade() -> None: op.drop_column('tickets', 'resolution_time_seconds')

Running Migrations

Upgrade Database to Latest

Upgrade to latest revision (head)

alembic upgrade head

See what would be executed (SQL only, don't run)

alembic upgrade head --sql

Upgrade one step at a time

alembic upgrade +1

Upgrade to specific revision

alembic upgrade abc123

Downgrade Database

Downgrade one revision

alembic downgrade -1

Downgrade to specific revision

alembic downgrade abc123

Downgrade to base (empty database)

alembic downgrade base

Generate SQL for downgrade without executing

alembic downgrade -1 --sql

Check Current Status

Show current database revision

alembic current

Show current revision with details

alembic current --verbose

Show migration history

alembic history

Show history with current revision marked

alembic history --indicate-current

Show specific revision range

alembic history -r base:head

Branching and Merging

Why Branch Migrations?

In customer support systems, you might have:

  • Feature branches: New features developed in parallel

  • Hotfix branches: Urgent fixes that can't wait for feature completion

  • Team branches: Multiple teams working on different modules

Creating a Branch

Create base for new branch

alembic revision -m "create reporting branch"
--head=base
--branch-label=reporting
--version-path=alembic/versions/reporting

Add migration to specific branch

alembic revision -m "add report tables"
--head=reporting@head

Example branch structure:

base ├── main branch │ ├── abc123: initial schema │ ├── def456: add tickets │ └── ghi789: add users └── reporting branch ├── rep001: create reports table └── rep002: add scheduled reports

Working with Multiple Branches

Show all branch heads

alembic heads

Show branch points

alembic branches

Upgrade specific branch

alembic upgrade reporting@head

Upgrade all branches

alembic upgrade heads

Merging Branches

When features are ready to merge:

Merge two branches

alembic merge -m "merge reporting into main"
main@head reporting@head

Generated merge migration:

"""merge reporting into main

Revision ID: merge001 Revises: ghi789, rep002 Create Date: 2025-01-15 14:00:00.000000 """

from alembic import op import sqlalchemy as sa

revision = 'merge001' down_revision = ('ghi789', 'rep002') # Multiple parents branch_labels = None depends_on = None

def upgrade() -> None: # Usually empty for simple merges # Add code if you need to reconcile conflicting changes pass

def downgrade() -> None: pass

Cross-Branch Dependencies

When one branch depends on another:

Create migration that depends on specific revision from another branch

alembic revision -m "reporting needs user table"
--head=reporting@head
--depends-on=def456 # Revision from main branch

Testing Migrations

Unit Testing Migrations

tests/test_migrations.py

import pytest from alembic import command from alembic.config import Config from sqlalchemy import create_engine, inspect from sqlalchemy.orm import sessionmaker

@pytest.fixture def alembic_config(): """Provide Alembic configuration for testing""" config = Config("alembic.ini") config.set_main_option( "sqlalchemy.url", "postgresql://localhost/support_test" ) return config

@pytest.fixture def test_db(alembic_config): """Create test database and apply migrations""" # Create engine engine = create_engine( alembic_config.get_main_option("sqlalchemy.url") )

# Run migrations to head
command.upgrade(alembic_config, "head")

yield engine

# Cleanup - downgrade to base
command.downgrade(alembic_config, "base")
engine.dispose()

def test_migration_creates_tickets_table(test_db): """Test that migrations create expected tables""" inspector = inspect(test_db) tables = inspector.get_table_names()

assert 'tickets' in tables
assert 'users' in tables
assert 'customer_satisfaction' in tables

def test_tickets_table_structure(test_db): """Test ticket table has correct columns""" inspector = inspect(test_db) columns = {col['name']: col for col in inspector.get_columns('tickets')}

assert 'id' in columns
assert 'priority' in columns
assert 'status' in columns
assert 'created_at' in columns
assert 'resolution_time_seconds' in columns

# Check column types
assert columns['priority']['type'].python_type == str
assert columns['status']['type'].python_type == str

def test_migration_upgrade_downgrade_cycle(alembic_config): """Test that upgrade -> downgrade -> upgrade works""" # Start at base command.downgrade(alembic_config, "base")

# Upgrade to head
command.upgrade(alembic_config, "head")

# Downgrade one step
command.downgrade(alembic_config, "-1")

# Upgrade back to head
command.upgrade(alembic_config, "head")

# Should complete without errors

def test_data_migration_preserves_data(test_db): """Test that data migrations don't lose data""" from sqlalchemy.orm import sessionmaker from myapp.models import Ticket

Session = sessionmaker(bind=test_db)
session = Session()

# Insert test data
ticket = Ticket(
    title="Test ticket",
    status="OPEN",
    priority="high"
)
session.add(ticket)
session.commit()
ticket_id = ticket.id
session.close()

# Run a migration that modifies tickets table
# (This would be a specific revision)
# command.upgrade(alembic_config, "specific_revision")

# Verify data still exists
session = Session()
retrieved = session.query(Ticket).filter_by(id=ticket_id).first()
assert retrieved is not None
assert retrieved.title == "Test ticket"
session.close()

Integration Testing

tests/test_migration_integration.py

import pytest from alembic import command from alembic.config import Config from alembic.script import ScriptDirectory from alembic.runtime.migration import MigrationContext

def test_no_pending_migrations(alembic_config, test_db): """Ensure all migrations are applied in test environment""" script = ScriptDirectory.from_config(alembic_config)

with test_db.connect() as connection:
    context = MigrationContext.configure(connection)
    current_heads = set(context.get_current_heads())
    script_heads = set(script.get_heads())

    assert current_heads == script_heads, \
        f"Database has pending migrations. Current: {current_heads}, Expected: {script_heads}"

def test_migration_order_is_valid(alembic_config): """Verify migration chain has no gaps or conflicts""" script = ScriptDirectory.from_config(alembic_config)

# Get all revisions
revisions = list(script.walk_revisions())

# Check each revision has valid down_revision
for revision in revisions:
    if revision.down_revision is not None:
        if isinstance(revision.down_revision, tuple):
            # Merge point
            for down_rev in revision.down_revision:
                assert script.get_revision(down_rev) is not None
        else:
            assert script.get_revision(revision.down_revision) is not None

def test_check_command_detects_drift(alembic_config, test_db): """Test that check command detects schema drift""" # This test verifies that alembic check works correctly try: command.check(alembic_config) # If no exception, database matches models assert True except Exception as e: # If exception, there's drift between DB and models pytest.fail(f"Schema drift detected: {e}")

Testing Migration Performance

tests/test_migration_performance.py

import time import pytest from alembic import command

def test_migration_completes_within_time_limit(alembic_config): """Ensure migrations complete within acceptable time""" # Downgrade to base command.downgrade(alembic_config, "base")

# Time the upgrade
start = time.time()
command.upgrade(alembic_config, "head")
duration = time.time() - start

# Assert completes within 60 seconds
assert duration < 60, f"Migration took {duration}s, exceeds 60s limit"

@pytest.mark.slow def test_data_migration_with_large_dataset(alembic_config, test_db): """Test data migration performance with realistic data volume""" from sqlalchemy.orm import sessionmaker from myapp.models import Ticket

Session = sessionmaker(bind=test_db)
session = Session()

# Create 10,000 test tickets
tickets = [
    Ticket(
        title=f"Test ticket {i}",
        status="OPEN",
        priority="normal"
    )
    for i in range(10000)
]
session.bulk_save_objects(tickets)
session.commit()
session.close()

# Run data migration and measure time
start = time.time()
command.upgrade(alembic_config, "data002")  # Specific data migration
duration = time.time() - start

# Should process 10k records in reasonable time
assert duration < 30, f"Data migration took {duration}s for 10k records"

CI/CD Integration

GitHub Actions Workflow

.github/workflows/migrations.yml

name: Database Migrations

on: pull_request: paths: - 'alembic/versions/' - 'myapp/models/' - 'alembic.ini' - 'alembic/env.py' push: branches: - main - develop

jobs: test-migrations: runs-on: ubuntu-latest

services:
  postgres:
    image: postgres:15
    env:
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: support_test
    options: >-
      --health-cmd pg_isready
      --health-interval 10s
      --health-timeout 5s
      --health-retries 5
    ports:
      - 5432:5432

steps:
  - uses: actions/checkout@v3

  - name: Set up Python
    uses: actions/setup-python@v4
    with:
      python-version: '3.11'

  - name: Install dependencies
    run: |
      pip install -r requirements.txt
      pip install pytest pytest-cov

  - name: Run migration tests
    env:
      DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
    run: |
      # Test upgrade to head
      alembic upgrade head

      # Test downgrade to base
      alembic downgrade base

      # Test upgrade again
      alembic upgrade head

      # Run pytest for migration tests
      pytest tests/test_migrations.py -v

  - name: Check for schema drift
    env:
      DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
    run: |
      alembic check

  - name: Validate migration history
    run: |
      # Check for multiple heads (should be only one)
      HEADS_COUNT=$(alembic heads | wc -l)
      if [ "$HEADS_COUNT" -gt 1 ]; then
        echo "ERROR: Multiple heads detected. Please merge branches."
        alembic heads
        exit 1
      fi

review-migration-sql: runs-on: ubuntu-latest if: github.event_name == 'pull_request'

steps:
  - uses: actions/checkout@v3

  - name: Set up Python
    uses: actions/setup-python@v4
    with:
      python-version: '3.11'

  - name: Install dependencies
    run: pip install -r requirements.txt

  - name: Generate SQL for review
    run: |
      # Generate SQL without executing
      alembic upgrade head --sql > migration.sql

  - name: Upload SQL artifact
    uses: actions/upload-artifact@v3
    with:
      name: migration-sql
      path: migration.sql

  - name: Comment PR with SQL
    uses: actions/github-script@v6
    with:
      script: |
        const fs = require('fs');
        const sql = fs.readFileSync('migration.sql', 'utf8');

        github.rest.issues.createComment({
          issue_number: context.issue.number,
          owner: context.repo.owner,
          repo: context.repo.repo,
          body: `## Migration SQL\n\n\`\`\`sql\n${sql}\n\`\`\``
        });

Deployment Script

#!/bin/bash

scripts/deploy_migrations.sh

set -e # Exit on error

echo "Starting database migration deployment..."

Environment variables

DB_HOST="${DB_HOST:-localhost}" DB_NAME="${DB_NAME:-support_prod}" DB_USER="${DB_USER:-postgres}" DATABASE_URL="postgresql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}"

Configuration

BACKUP_DIR="./backups" TIMESTAMP=$(date +%Y%m%d_%H%M%S) BACKUP_FILE="${BACKUP_DIR}/pre_migration_${TIMESTAMP}.sql"

Create backup directory

mkdir -p "$BACKUP_DIR"

1. Backup database before migration

echo "Creating database backup..." pg_dump "$DATABASE_URL" > "$BACKUP_FILE" echo "Backup created: $BACKUP_FILE"

2. Check current migration status

echo "Current migration status:" alembic current

3. Show pending migrations

echo "Pending migrations:" alembic history --verbose | grep -A 5 "head"

4. Run migrations with timeout

echo "Running migrations..." timeout 300 alembic upgrade head || { echo "ERROR: Migration failed or timed out!" echo "Restoring from backup..." psql "$DATABASE_URL" < "$BACKUP_FILE" exit 1 }

5. Verify migration success

echo "Verifying migration status..." CURRENT_REV=$(alembic current | grep "Rev:" | awk '{print $2}') HEAD_REV=$(alembic heads | awk '{print $1}')

if [ "$CURRENT_REV" != "$HEAD_REV" ]; then echo "ERROR: Migration incomplete. Current: $CURRENT_REV, Expected: $HEAD_REV" echo "Restoring from backup..." psql "$DATABASE_URL" < "$BACKUP_FILE" exit 1 fi

echo "Migration completed successfully!" echo "Current revision: $CURRENT_REV"

6. Cleanup old backups (keep last 10)

echo "Cleaning up old backups..." ls -t "$BACKUP_DIR"/*.sql | tail -n +11 | xargs -r rm

echo "Deployment complete!"

Production Best Practices

Pre-Deployment Checklist

  • Migration tested in development environment

  • Migration tested in staging with production-like data

  • Migration reviewed by at least one team member

  • Downgrade path tested and verified

  • Performance impact assessed for large tables

  • Database backup plan in place

  • Rollback procedure documented

  • Maintenance window scheduled (if needed)

  • Team notified of deployment

  • Monitoring alerts configured

Zero-Downtime Migrations

For critical support systems that can't go offline:

Phase 1: Additive Changes

"""add new column (phase 1)

Revision ID: zd001 """

def upgrade() -> None: # Add new column as nullable op.add_column('tickets', sa.Column('new_field', sa.String(100), nullable=True) )

def downgrade() -> None: op.drop_column('tickets', 'new_field')

Phase 2: Data Migration (Background)

"""populate new column (phase 2)

Revision ID: zd002 """

def upgrade() -> None: # Update in small batches during low-traffic periods connection = op.get_bind()

batch_size = 100
while True:
    result = connection.execute(
        """
        UPDATE tickets
        SET new_field = calculate_value(old_field)
        WHERE new_field IS NULL
        LIMIT {batch_size}
        """.format(batch_size=batch_size)
    )
    if result.rowcount == 0:
        break

    # Small delay to reduce database load
    import time
    time.sleep(0.1)

def downgrade() -> None: connection = op.get_bind() connection.execute("UPDATE tickets SET new_field = NULL")

Phase 3: Make Required

"""make new column required (phase 3)

Revision ID: zd003 """

def upgrade() -> None: # Now that all rows have values, make it non-nullable op.alter_column('tickets', 'new_field', nullable=False, server_default='default_value' )

def downgrade() -> None: op.alter_column('tickets', 'new_field', nullable=True, server_default=None )

Phase 4: Remove Old Column (Optional)

"""remove old column (phase 4)

Revision ID: zd004 """

def upgrade() -> None: op.drop_column('tickets', 'old_field')

def downgrade() -> None: op.add_column('tickets', sa.Column('old_field', sa.String(100), nullable=True) )

Handling Migration Failures

alembic/env.py additions for error handling

from alembic import context import logging

logger = logging.getLogger('alembic.env')

def run_migrations_online(): """Run migrations in 'online' mode with error handling"""

connectable = engine_from_config(
    config.get_section(config.config_ini_section),
    prefix='sqlalchemy.',
    poolclass=pool.NullPool,
)

with connectable.connect() as connection:
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        transaction_per_migration=True,  # Rollback individual migrations
        compare_type=True,
        compare_server_default=True
    )

    try:
        with context.begin_transaction():
            context.run_migrations()

    except Exception as e:
        logger.error(f"Migration failed: {e}")
        logger.error("Rolling back transaction...")
        # Transaction automatically rolled back
        raise

    else:
        logger.info("Migration completed successfully")

Advanced Configuration

Custom Migration Template

Create custom template for your organization:

alembic/script.py.mako

"""${message}

Revision ID: ${up_revision} Revises: ${down_revision | comma,n} Create Date: ${create_date}

Author: ${author if author else 'Support Team'} Jira: ${jira_ticket if jira_ticket else 'N/A'} """

from alembic import op import sqlalchemy as sa ${imports if imports else ""}

revision identifiers, used by Alembic.

revision = ${repr(up_revision)} down_revision = ${repr(down_revision)} branch_labels = ${repr(branch_labels)} depends_on = ${repr(depends_on)}

def upgrade() -> None: """Apply migration changes""" ${upgrades if upgrades else "pass"}

def downgrade() -> None: """Revert migration changes""" ${downgrades if downgrades else "pass"}

Multi-Database Support

For systems with separate databases (e.g., main DB + analytics):

alembic/env.py for multiple databases

def run_migrations_online(): """Run migrations for multiple databases"""

# Configuration for each database
engines = {
    'main': {
        'url': os.getenv('MAIN_DB_URL'),
        'target_metadata': main_metadata
    },
    'analytics': {
        'url': os.getenv('ANALYTICS_DB_URL'),
        'target_metadata': analytics_metadata
    }
}

for name, config in engines.items():
    logger.info(f"Running migrations for {name} database")

    engine = create_engine(config['url'])

    with engine.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=config['target_metadata'],
            upgrade_token=f"{name}_upgrade",
            downgrade_token=f"{name}_downgrade"
        )

        with context.begin_transaction():
            context.run_migrations(engine_name=name)

Troubleshooting

Common Issues and Solutions

Multiple Heads Error

Problem: "Multiple heads exist"

Solution: Merge the branches

alembic merge heads -m "merge branches"

Migration Out of Sync

Problem: Database revision doesn't match migration history

Solution: Stamp database to specific revision

alembic stamp head

Or stamp to specific revision

alembic stamp abc123

Failed Migration Cleanup

Problem: Migration failed midway

Solution: Manual cleanup

1. Check current state

alembic current

2. Manually fix database issues

psql $DATABASE_URL

3. Stamp to correct revision

alembic stamp previous_working_revision

4. Try migration again

alembic upgrade head

Circular Dependencies

Problem: "Circular dependency detected"

Solution: Use depends_on instead of down_revision

alembic revision -m "fix circular dependency"
--head=branch_a@head
--depends-on=branch_b_revision

Summary

This skill covered comprehensive Alembic usage for customer support systems:

  • Setup: Installation, configuration, and initialization

  • Creating Migrations: Manual and autogenerated approaches

  • Data Migrations: Transforming data during schema changes

  • Running Migrations: Upgrade, downgrade, and status commands

  • Branching: Managing parallel development streams

  • Testing: Unit, integration, and performance testing

  • CI/CD: Automation and deployment strategies

  • Production: Zero-downtime migrations and best practices

  • Advanced: Custom templates and multi-database support

  • Troubleshooting: Common issues and solutions

Always remember:

  • Review autogenerated migrations

  • Test migrations thoroughly before production

  • Keep backups before major migrations

  • Plan for rollback scenarios

  • Monitor migration performance

  • Document complex migrations

For more examples, see EXAMPLES.md in this skill package.

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

docker-compose-orchestration

No summary provided by upstream source.

Repository SourceNeeds Review
General

postgresql-database-engineering

No summary provided by upstream source.

Repository SourceNeeds Review
General

jest-react-testing

No summary provided by upstream source.

Repository SourceNeeds Review
General

ui-design-patterns

No summary provided by upstream source.

Repository SourceNeeds Review