APM

>Agent Skill

@aiskillstore/alembic

skilldevelopment

Database migration management for SQLAlchemy projects using Alembic

apm::install
$apm install @aiskillstore/alembic
apm::skill.md
---
name: alembic
description: Database migration management for SQLAlchemy projects using Alembic
when_to_use: When you need to create, apply, or manage database schema changes in SQLAlchemy projects
---

# Alembic Database Migrations

Alembic is a database migration tool for SQLAlchemy projects that provides version control for your database schema.

## Quick Start

### Create Migration (Autogenerate)

```bash
# Generate migration from model changes
uv run alembic revision --autogenerate -m "Add user table"

# Check if there are pending changes
uv run alembic check
```

### Apply Migrations

```bash
# Upgrade to latest version
uv run alembic upgrade head

# Upgrade to specific revision
uv run alembic upgrade ae1027a6acf

# Downgrade one revision
uv run alembic downgrade -1

# Downgrade to base (empty schema)
uv run alembic downgrade base
```

### Check Status

```bash
# Show current database revision
uv run alembic current

# Show all revision history
uv run alembic history

# Show revision details
uv run alembic show ae1027a6acf
```

## Common Patterns

### Autogenerate Configuration

**env.py setup for async SQLAlchemy:**

```python
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context

# Import your models
from app.models import Base
from app.config import get_settings

config = context.config
settings = get_settings()

# Configure database URL for async
database_url = settings.database_url.replace("postgresql://", "postgresql+asyncpg://")
config.set_main_option("sqlalchemy.url", database_url)

target_metadata = Base.metadata

async def run_async_migrations():
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()

def do_run_migrations(connection):
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
        compare_server_default=True,
        render_as_batch=False,  # Set to True for SQLite
    )

    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online():
    asyncio.run(run_async_migrations())

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()
```

### Manual Migration Operations

**Common schema changes:**

```python
from alembic import op
import sqlalchemy as sa

def upgrade():
    # Add column
    op.add_column('users', sa.Column('email', sa.String(255), nullable=True))

    # Rename table
    op.rename_table('old_table', 'new_table')

    # Create index
    op.create_index('ix_users_email', 'users', ['email'])

    # Add constraint
    op.create_check_constraint('ck_age_positive', 'users', 'age > 0')

def downgrade():
    # Reverse operations
    op.drop_constraint('ck_age_positive', 'users')
    op.drop_index('ix_users_email')
    op.rename_table('new_table', 'old_table')
    op.drop_column('users', 'email')
```

### Batch Mode (for SQLite)

**Configure batch mode in env.py:**

```python
context.configure(
    connection=connection,
    target_metadata=target_metadata,
    render_as_batch=True  # Required for SQLite migrations
)
```

**Generated batch migration:**

```python
def upgrade():
    with op.batch_alter_table('users', schema=None) as batch_op:
        batch_op.add_column(sa.Column('email', sa.String(length=255), nullable=True))
        batch_op.create_index('ix_users_email', ['email'], unique=False)
```

### Filtering Objects

**Skip certain objects in autogenerate:**

```python
def include_object(object, name, type_, reflected, compare_to):
    # Skip temporary tables
    if type_ == "table" and name.startswith("temp_"):
        return False

    # Skip columns with skip_autogenerate flag
    if type_ == "column" and not reflected:
        if object.info.get("skip_autogenerate", False):
            return False

    return True

context.configure(
    connection=connection,
    target_metadata=target_metadata,
    include_object=include_object
)
```

**Filter by schema:**

```python
def include_name(name, type_, parent_names):
    if type_ == "schema":
        return name in [None, "public", "auth"]  # Include default + specific schemas
    elif type_ == "table":
        return parent_names["schema_qualified_table_name"] in target_metadata.tables
    return True

context.configure(
    connection=connection,
    target_metadata=target_metadata,
    include_name=include_name,
    include_schemas=True
)
```

### Custom Migration Processing

**Modify generated migrations:**

```python
def process_revision_directives(context, revision, directives):
    script = directives[0]

    # Skip empty migrations
    if config.cmd_opts.autogenerate and script.upgrade_ops.is_empty():
        directives[:] = []
        return

    # Remove downgrade operations for one-way migrations
    script.downgrade_ops.ops[:] = []

context.configure(
    connection=connection,
    target_metadata=target_metadata,
    process_revision_directives=process_revision_directives
)
```

### Data Migrations

**Migrate data during schema change:**

```python
def upgrade():
    # Add new column
    op.add_column('users', sa.Column('full_name', sa.String(255), nullable=True))

    # Migrate data
    connection = op.get_bind()
    connection.execute(
        sa.text("UPDATE users SET full_name = first_name || ' ' || last_name")
    )

    # Make column required after data migration
    op.alter_column('users', 'full_name', nullable=False)

def downgrade():
    op.drop_column('users', 'full_name')
```

### Branch Migrations

**Work with multiple branches:**

```bash
# Create branch
uv run alembic revision -m "Create feature branch" --head=base --branch-label=feature_x

# Upgrade specific branch
uv run alembic upgrade feature_x@head

# Merge branches
uv run alembic merge -m "Merge feature_x into main" feature_x@head main@head
```

## Practical Code Snippets

### Check if Database is Up-to-Date

```python
from alembic import config, script
from alembic.runtime import migration
from sqlalchemy import create_engine

def is_database_up_to_date(alembic_cfg_path, database_url):
    """Check if database schema matches latest migrations"""
    cfg = config.Config(alembic_cfg_path)
    directory = script.ScriptDirectory.from_config(cfg)

    engine = create_engine(database_url)
    with engine.begin() as connection:
        context = migration.MigrationContext.configure(connection)
        current_heads = set(context.get_current_heads())
        latest_heads = set(directory.get_heads())
        return current_heads == latest_heads
```

### Programmatically Run Migrations

```python
from alembic import command
from alembic.config import Config

def run_migrations(alembic_ini_path):
    """Run all pending migrations"""
    alembic_cfg = Config(alembic_ini_path)
    command.upgrade(alembic_cfg, "head")

def create_migration(alembic_ini_path, message, autogenerate=True):
    """Create new migration"""
    alembic_cfg = Config(alembic_ini_path)
    command.revision(alembic_cfg, message=message, autogenerate=autogenerate)
```

### Custom Migration Operations

```python
from alembic.autogenerate import rewriter
from alembic.operations import ops

writer = rewriter.Rewriter()

@writer.rewrites(ops.AddColumnOp)
def add_column_non_nullable(context, revision, op):
    """Add non-nullable columns in two steps"""
    if not op.column.nullable:
        op.column.nullable = True
        return [
            op,
            ops.AlterColumnOp(
                op.table_name,
                op.column.name,
                nullable=False,
                existing_type=op.column.type,
                schema=op.schema
            )
        ]
    return op

# Use in env.py
context.configure(
    connection=connection,
    target_metadata=target_metadata,
    process_revision_directives=writer
)
```

## Requirements

- **Python 3.8+**: Required for async support
- **SQLAlchemy 2.0+**: For modern async patterns
- **PostgreSQL/MySQL/SQLite**: Supported databases
- **Alembic 1.8+**: Migration tooling

### Common Dependencies

```bash
# Core dependencies
uv add alembic sqlalchemy

# For PostgreSQL with async
uv add asyncpg

# For MySQL with async
uv add aiomysql

# For SQLite (built-in)
# No additional packages needed
```

### Development Setup

```bash
# Initialize Alembic in existing project
uv run alembic init alembic

# Configure env.py for your models
# Edit alembic.ini for database URL

# First migration
uv run alembic revision --autogenerate -m "Initial schema"
uv run alembic upgrade head
```